Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add t5common package #29

Merged
merged 15 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/common-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Run tests for common library

on:
push:
branches:
- main
pull_request:

jobs:
run-tests:
name: common-${{ matrix.name }}
runs-on: ${{ matrix.os }}
defaults:
run:
shell: bash
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ matrix.name }}
cancel-in-progress: true
strategy:
fail-fast: false
matrix:
include:
- { name: linux-python3.10-minimum , requirements: minimum, python-ver: "3.10", os: ubuntu-latest }
- { name: linux-python3.13 , requirements: pinned , python-ver: "3.13", os: ubuntu-latest }

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-ver }}

- name: Install dependencies
run: |
cd common
pip install -e .
pip install -r tests/requirements.txt

- name: Run tests
run: |
cd common
pytest tests

86 changes: 86 additions & 0 deletions common/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# The t5common package
`t5common` is a Python package for common operations in the Taskforce5 commons.

- A class for connecting to and interacting with Jira (Python class `t5common.jira.JiraConnector`)
- A framework for polling Jira for new issues, and starting workflows (commands `init-db`, `check-jira`, and `mark-job`)
- A class for building Slurm sbatch scripts (Python class `t5common.job.SlurmJob`)


## Jira-based workflow automation

Workflows can be automatically triggered using the `init-db`, `check-jira`, and `mark-job` commands that come with
the `t5common` package.

### Configuring workflow automation

Workflow automation with Jira is configured using a YAML file. The YAML file must contain the following keys:

- `host` - the Jira host to get issues for running jobs from
- `user` - the username for connecting to Jira
- `token_file` - the path to a file containing the Jira API token. This file should contain the token on a single line
- `database` - the path to the SQLite database to use for tracking jobs associated with issues
- `job_directory` - the job directory to run jobs from.
- `projects` - A list of objects containing the information needed to automate workflows from a Jira project. These objects must contain the following keys:
- `project` - The project to query new issues for
- `new_status` - The issue status indicating an issue is new and should have a workflow run for it.
- `workflow_command` - The command to run to start a new workflow. This command should take the issue key as the first and only positional argument.
- `publish_command` - The command to run to publish results. This command should take working directory the workflow was started from as the first and only positional argument.

For more details, refer to the JSON schema in`t5common/jira/schema/config.json`.


### Initializing workflow automation

Once you have defined your configuration file, you will need to initialize the SQLite database using the `init-db` command.

```bash
init-db config.yaml
```

This database maintain jobs in three states:

- `WORKFLOW_STARTED` - Job has been picked up from Jira and workflow has been started
- `WORKFLOW_FINISHED` - Job execution has finished
- `PUBLISH_STARTED` - Result publishing script has been started
- `PUBLISHED` - Job results have been published


### Starting and checking on jobs

**This section documents the commands that must run as a cron job to automate workflow execution**

Jobs can be started using the `check-jira` command.

```bash
check-jira config.yaml
```

This will check each project specified in with the `projects` key the configuration file, and start job for each new issue. This
job will be started by invoking the command specified with `workflow_command` with the issue provided as the first and only argument.
The `workflow_command` command will be invoked from a subdirectory named after the issue key and created in the directory specified
by the `job_directory` key of the configuration file.

Job status can be checked using the `check-job` command.

```bash
check-jobs config.yaml
```

This will check the database for jobs that have been marked as finished (See below for `mark-job` command). The `check-jobs` command
will run the command specified with the `publish_command` in the config file, passing in the path to subdirectory that the workflow
was executed from as the first and only argument.


### Updating jobs

**This section documents the command that workflows are required to use to connect to the workflow automation system**

Workflows will need to indicate that jobs have been finished or published using the command `mark-job`. The first argument to this command
must be `finished` or `published`, indicating that the job has finished running or the results have been published, respectively. The `mark-job`
command also takes a second optional argument, specifying the job directory. This defaults to the current working directory.

```bash
mark-job finished
```

Workflows must ensure that the `t5common` package is installed in their environments, and that they call `mark-job` when steps are complete.
35 changes: 35 additions & 0 deletions common/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[build-system]
requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2"]
build-backend = "setuptools.build_meta"

[project]
name = "t5common"
version = "0.1.0"
description = "A library for common routines needed for Taskforce5 code"
readme = "README.md" # Path to your README file
requires-python = ">=3.7"
license = { text = "Modified BSD" } # Specify your license here
authors = [
{ name = "Andrew Tritt", email = "[email protected]" }
]
keywords = ["t5", "common", "library"]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"requests",
"jsonschema",
"sqlalchemy",
"pyyaml",
]

[project.scripts]
init-db = "t5common.jira.database:init_db"
check-jira = "t5common.jira.check_jira:main"
check-jobs = "t5common.jira.check_jobs:main"
mark-job = "t5common.jira.mark_job:main"

[tool.setuptools.package-data]
"t5common.jira" = ["schema/*.json"]
Empty file added common/t5common/__init__.py
Empty file.
1 change: 1 addition & 0 deletions common/t5common/jira/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .connector import JiraConnector
118 changes: 118 additions & 0 deletions common/t5common/jira/check_jira.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import argparse
import asyncio
import json
import os
from os.path import abspath, relpath
import re
import sys
import subprocess
import time

import yaml

from .connector import JiraConnector
from .database import DBConnector
from .utils import load_config, get_job_env, open_wf_file
from ..utils import get_logger, read_token


logger = get_logger()


def format_query(config):
return 'project = {project} AND status = "{new_status}"'.format(**config)


async def intiate_job(issue, project_config, config):
logger.info(f"Initiating job for {issue}")
env, wd = get_job_env(issue, config)

# Make working directory for job
if os.path.exists(wd):
logger.error(f"Job already initiated for {issue} - {wd} exists")
return -1, None
else:
os.mkdir(wd)

# Add workflow info to the working directory for subsequent steps
wf_info = {
'issue': issue,
'database': relpath(abspath(config['database']), abspath(wd)),
}
with open_wf_file(wd, 'w') as f:
json.dump(wf_info, f)

# Set up the command to run in the subprocess
command = re.split(r'\s+', project_config['workflow_command'])
command.append(issue)

# Call the job command in a subprocess
logger.info(f"Executing workflow command for {issue}: {' '.join(command)}")
process = await asyncio.create_subprocess_exec(
command[0], *command[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
cwd=wd,
)
# Read the output and error streams
stdout, _ = await process.communicate()

if process.returncode != 0:
logger.error(f"Workflow command for {issue} failed:\n{stdout.decode()}")
else:
msg = f"Workflow command for {issue} succeeded"
if len(stdout) > 0:
msg += f"\n{stdout.decode()}"
logger.info(msg)

return process.returncode, wd


async def check_jira(config):
# Connect to Jira
jc = JiraConnector(jira_host=config['host'],
jira_user=config['user'],
jira_token=read_token(config['token_file']))

database = config['database']
dbc = DBConnector(f"sqlite:///{database}")

# Check each project queue, and create a new job for each new issue
tasks = list()
issues = list()
for project_config in config['projects']:
query = format_query(project_config)
proj_issues = jc.query(query)['issues']
for issue in proj_issues:
key = issue['key']
state = dbc.job_state(key)
if state is not None:
continue
issues.append((project_config['project'], key))
tasks.append(intiate_job(key, project_config, config))

results = await asyncio.gather(*tasks)
for (project, issue), (retcode, wd) in zip(issues, results):
if retcode == 0:
logger.info(f"Issue {issue} marked as workflow started")
dbc.start_job(issue, wd, project)
else:
logger.error(f"Issue {issue} failed -- not marking as workflow started")

dbc.close()


def main():
parser = argparse.ArgumentParser(description="Poll Jira projects and run a script for each issue.")
parser.add_argument('config', type=str, help='Path to the YAML configuration file')
args = parser.parse_args()

config = None

config = load_config(args.config)
asyncio.run(check_jira(config))


if __name__ == "__main__":
main()
93 changes: 93 additions & 0 deletions common/t5common/jira/check_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import argparse
import asyncio
import json
import os
from os.path import abspath, relpath
import re
import sys
import subprocess
import time

import yaml

from .connector import JiraConnector
from .database import DBConnector, WORKFLOW_FINISHED, PUBLISH_STARTED
from .utils import load_config, get_job_env, open_wf_file
from ..utils import get_logger, read_token


logger = get_logger()


async def publish_job(issue, project_config, config):
logger.info(f"Publishing results for {issue}")
env, wd = get_job_env(issue, config)

# Add workflow info to the working directory for subsequent steps
with open_wf_file(wd, 'r') as f:
wf_info = json.load(f)

# Set up the command to run in the subprocess
command = re.split(r'\s+', project_config['publish_command'])
command.append(wd)

# Call the job command in a subprocess
logger.info(f"Executing publish command for {issue}: {' '.join(command)}")
process = await asyncio.create_subprocess_exec(
command[0], *command[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
cwd=wd,
)
# Read the output and error streams
stdout, _ = await process.communicate()

if process.returncode != 0:
logger.error(f"Publish command for {issue} failed:\n{stdout.decode()}")
else:
msg = f"Publish command for {issue} succeeded"
if len(stdout) > 0:
msg += f"\n{stdout.decode()}"
logger.info(msg)

return process.returncode, wd


async def check_jobs(config):
database = config['database']
dbc = DBConnector(f"sqlite:///{database}")

# Check each project queue, and create a new job for each new issue
tasks = list()
issues = list()
for project_config in config['projects']:
jobs = dbc.get_jobs(WORKFLOW_FINISHED, project_config['project'])
for job in jobs:
issues.append(job.issue)
tasks.append(publish_job(job.issue, project_config, config))

results = await asyncio.gather(*tasks)
for issue, (retcode, wd) in zip(issues, results):
if retcode == 0:
logger.info(f"Issue {issue} marked as publishing started")
dbc.transition_job(issue, PUBLISH_STARTED)
else:
logger.error(f"Issue {issue} publishing start failed")

dbc.close()


def main():
parser = argparse.ArgumentParser(description="Poll Jira projects and run a script for each issue.")
parser.add_argument('config', type=str, help='Path to the YAML configuration file')
args = parser.parse_args()

config = None

config = load_config(args.config)
asyncio.run(check_jobs(config))


if __name__ == "__main__":
main()
Loading