Skip to content

Commit

Permalink
Merge pull request #79 from cmoussa1/calculate.job.usage
Browse files Browse the repository at this point in the history
fairshare implementation part 1: add effective job usage factor calculation
  • Loading branch information
mergify[bot] authored Dec 17, 2020
2 parents e5f8968 + 4cb23c5 commit f24b065
Show file tree
Hide file tree
Showing 11 changed files with 749 additions and 142 deletions.
85 changes: 85 additions & 0 deletions src/bindings/python/flux/accounting/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
### Job Usage Factor Calculation Documentation

`calc_usage_factor()` is the function responsible for calculating a user's job usage factor as it relates to fairshare. The raw job usage factor is defined as the sum of products of number of nodes used (`nnodes`) and time elapsed (`t_elapsed`).

```
RawUsage = sum(nnodes * t_elapsed)
```

`create_db()` creates a new table **job_usage_factor_table** that is dynamically sized based on two options passed in when initially creating the database: **PriorityDecayHalfLife** and **PriorityUsageResetPeriod**. Each of these parameters represent a number of weeks by which to hold usage factors up to the time period where jobs no longer play a factor in calculating a usage factor. If these options aren't specified, the table defaults to 4 usage columns, each which represent one week's worth of jobs.

The **job_usage_factor_table** stores past job usage factors per user/bank combination in the `association_table`. When a user is first added to **association_table**, they are also added to to **job_usage_factor_table**.

```python
def calc_usage_factor(jobs_conn, acct_conn, user, bank, priority_decay_half_life=None, priority_usage_reset_period=None,)
```

The value of **PriorityDecayHalfLife** determines the amount of time that represents one "usage period" of jobs. It then uses `view_job_records()` to filter out the job archive and retrieve a user's jobs that have completed in the time period specified. It saves the last seen `t_inactive` timestamp in the `job_usage_factor_table` for the next query that it runs, which will look for jobs that have completed after the saved timestamp.

Past usage factors have a decay factor D (0.5) applied to them before they are added to the user's current usage factor.

```python
def apply_decay_factor(decay_factor, acct_conn, user=None, bank=None):
```

**usage_user_past** = `( D * Ulast_period) + (D * D * Uperiod-2) + ...`

After the current usage factor is calculated, it is written to the first usage bin in **job_usage_factor_table** along with the other, older factors. The oldest factor gets removed from the table since it is no longer needed.

Then, a similar process is repeated to calculate the raw usage factor for all of the user's siblings' jobs in that same time period.

---

### An example of calculating the job usage factor


Let's say a user has the following job records from the most recent **PriorityDecayHalfLife**:

```
UserID Username JobID T_Submit T_Run T_Inactive Nodes R
0 1002 1002 102 1605633403.22141 1605635403.22141 1605637403.22141 2 {"version":1,"execution": {"R_lite":[{"rank":"0","children": {"core": "0"}}]}}
1 1002 1002 103 1605633403.22206 1605635403.22206 1605637403.22206 2 {"version":1,"execution": {"R_lite":[{"rank":"0","children": {"core": "0"}}]}}
2 1002 1002 104 1605633403.22285 1605635403.22286 1605637403.22286 2 {"version":1,"execution": {"R_lite":[{"rank":"0","children": {"core": "0"}}]}}
3 1002 1002 105 1605633403.22347 1605635403.22348 1605637403.22348 1 {"version":1,"execution": {"R_lite":[{"rank":"0","children": {"core": "0"}}]}}
4 1002 1002 106 1605633403.22416 1605635403.22416 1605637403.22416 1 {"version":1,"execution": {"R_lite":[{"rank":"0","children": {"core": "0"}}]}}
```

**total nodes used**: 8

**total time elapsed**: 10000.0

**usage_user_current**:

```
sum(nnodes * t_elapsed) = (2 * 2000) + (2 * 2000) + (2 * 2000) + (1 * 2000) + (1 * 2000)
= 4000 + 4000 + 4000 + 2000 + 2000
= 16000
```

And the user's past job usage factors (each one represents a **PriorityDecayHalfLife** period up to the **PriorityUsageResetPeriod**) consist of the following:

```
username bank usage_factor_period_0 usage_factor_period_1 usage_factor_period_2 usage_factor_period_3
0 1002 C 128.0000 64.00000 64.0000 16.00000
```

The past usage factors have the decay factor applied to them: `[64.0, 16.0, 8.0, 1.0]`

**usage_user_past**: `64.0 + 16.0 + 8.0 + 1.0 = 89`

**usage_user_historical**: (usage\_user\_current) + (usage\_user\_past) = 16000 + 89 = 16089

---

### A typical workflow using calc_usage_factor() and update_end_half_life_period()

Ultimately, a Python script (through a `cron` job or the like) would end up utilizing both `calc_usage_factor()` and `update_end_half_life_period()` in the following manner.


Every **PriorityCalcPeriod**, the script would go through the following steps:

- A list of user/bank combinations would be fetched from the `association_table` in the flux-accounting database.

- For every user/bank combination, `calc_usage_factor()` is called; any new job records are fetched from the job-archive DB and a historical usage factor is generated. The appropriate values throughout the flux-accounting database would be updated to reflect this new usage factor.

- After the list of user/bank usage values are calculated, `update_end_half_life_period()` is called to determine if we are in a new half-life period. If we are, we update the flux-accounting database with the new timestamp that represents the end of the new half-life period.
26 changes: 19 additions & 7 deletions src/bindings/python/flux/accounting/accounting_cli_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def get_sub_banks(row):
# we've reached a bank with no sub banks
if len(dataframe) == 0:
select_associations_stmt = """
SELECT user_name, bank
SELECT username, bank
FROM association_table
WHERE bank=?
"""
Expand Down Expand Up @@ -136,7 +136,7 @@ def edit_bank(conn, bank, shares):
def view_user(conn, user):
try:
# get the information pertaining to a user in the Accounting DB
select_stmt = "SELECT * FROM association_table where user_name=?"
select_stmt = "SELECT * FROM association_table where username=?"
dataframe = pd.read_sql_query(select_stmt, conn, params=(user,))
# if the length of dataframe is 0, that means
# the user specified was not found in the table
Expand All @@ -150,15 +150,15 @@ def view_user(conn, user):

def add_user(conn, username, bank, admin_level=1, shares=1, max_jobs=1, max_wall_pj=60):

# insert the user values into the database
try:
# insert the user values into association_table
conn.execute(
"""
INSERT INTO association_table (
creation_time,
mod_time,
deleted,
user_name,
username,
admin_level,
bank,
shares,
Expand All @@ -181,21 +181,33 @@ def add_user(conn, username, bank, admin_level=1, shares=1, max_jobs=1, max_wall
)
# commit changes
conn.commit()
# insert the user values into job_usage_factor_table
conn.execute(
"""
INSERT INTO job_usage_factor_table (
username,
bank
)
VALUES (?, ?)
""",
(username, bank,),
)
conn.commit()
# make sure entry is unique
except sqlite3.IntegrityError as integrity_error:
print(integrity_error)


def delete_user(conn, user, bank):
# delete user account from association_table
delete_stmt = "DELETE FROM association_table WHERE user_name=? AND bank=?"
delete_stmt = "DELETE FROM association_table WHERE username=? AND bank=?"
cursor = conn.cursor()
cursor.execute(delete_stmt, (user, bank,))


def edit_user(conn, username, field, new_value):
fields = [
"user_name",
"username",
"admin_level",
"bank",
"shares",
Expand All @@ -208,7 +220,7 @@ def edit_user(conn, username, field, new_value):

# edit value in accounting database
conn.execute(
"UPDATE association_table SET " + the_field + "=? WHERE user_name=?",
"UPDATE association_table SET " + the_field + "=? WHERE username=?",
(new_value, username,),
)
# commit changes
Expand Down
106 changes: 103 additions & 3 deletions src/bindings/python/flux/accounting/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,64 @@
import argparse
import sys
import pathlib
import math
import time

def create_db(filepath):

def add_usage_columns_to_table(
conn, table_name, priority_usage_reset_period=None, priority_decay_half_life=None
):
# the number of columns (or 'bins') holding usage factors is determined by
# the following:
#
# PriorityUsageResetPeriod / PriorityDecayHalfLife
#
# each parameter represents a number of weeks by which to hold usage
# factors up to the time period where jobs no longer play a factor in
# calculating a usage factor
column_name = "usage_factor_period_0"
if priority_decay_half_life != None and priority_usage_reset_period != None:
num_columns = math.ceil(
int(priority_usage_reset_period) / int(priority_decay_half_life)
)
else:
num_columns = 4
for i in range(num_columns):
alter_stmt = (
"ALTER TABLE "
+ table_name
+ " ADD COLUMN "
+ column_name
+ " REAL DEFAULT 0.0"
)
conn.execute(alter_stmt)
conn.commit()
column_name = "usage_factor_period_" + str(i + 1)


def set_half_life_period_end(conn, priority_decay_half_life=None):
if priority_decay_half_life != None:
# convert number of weeks to seconds; this will be appended
# to the current time to represent one 'half-life' period
# for the first usage bin
half_life_period = int(priority_decay_half_life) * 604800
half_life_period_end = time.time() + half_life_period
else:
half_life_period = 604800
half_life_period_end = time.time() + half_life_period

update_stmt = """
UPDATE t_half_life_period_table
SET end_half_life_period=?
WHERE cluster='cluster'
"""
conn.execute(update_stmt, (str(half_life_period_end),))
conn.commit()


def create_db(
filepath, priority_usage_reset_period=None, priority_decay_half_life=None
):
db_dir = pathlib.PosixPath(filepath).parent
db_dir.mkdir(parents=True, exist_ok=True)
try:
Expand All @@ -36,13 +92,13 @@ def create_db(filepath):
creation_time bigint(20) NOT NULL,
mod_time bigint(20) DEFAULT 0 NOT NULL,
deleted tinyint(4) DEFAULT 0 NOT NULL,
user_name tinytext NOT NULL,
username tinytext NOT NULL,
admin_level smallint(6) DEFAULT 1 NOT NULL,
bank tinytext NOT NULL,
shares int(11) DEFAULT 1 NOT NULL,
max_jobs int(11) NOT NULL,
max_wall_pj int(11) NOT NULL,
PRIMARY KEY (user_name, bank)
PRIMARY KEY (username, bank)
);"""
)
logging.info("Created association_table successfully")
Expand All @@ -61,4 +117,48 @@ def create_db(filepath):
)
logging.info("Created bank_table successfully")

# Job Usage Factor Table
# stores past job usage factors for users
logging.info("Creating job_usage_factor table in DB...")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS job_usage_factor_table (
username tinytext NOT NULL,
bank tinytext NOT NULL,
last_job_timestamp real DEFAULT 0.0,
PRIMARY KEY (username, bank),
FOREIGN KEY (username, bank)
REFERENCES association_table (username, bank)
ON UPDATE CASCADE
ON DELETE CASCADE
);"""
)
add_usage_columns_to_table(
conn,
"job_usage_factor_table",
priority_usage_reset_period,
priority_decay_half_life,
)
logging.info("Created job_usage_factor_table successfully")

# Half Life Timestamp Table
# keeps track of current half-life period
logging.info("Creating t_half_life_period_table in DB...")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS t_half_life_period_table (
cluster tinytext DEFAULT 'cluster',
end_half_life_period real DEFAULT 0.0
);"""
)
conn.execute(
"""
INSERT INTO t_half_life_period_table (cluster, end_half_life_period)
VALUES ('cluster', 0.0);
"""
)
set_half_life_period_end(conn, priority_decay_half_life)
logging.info("Created t_half_life_period_table successfully")

conn.close()
29 changes: 21 additions & 8 deletions src/bindings/python/flux/accounting/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
###############################################################
import sqlite3
import argparse
import time
import sys
import os

import pandas as pd

import flux.accounting
from flux.accounting import accounting_cli_functions as aclif
from flux.accounting import job_archive_interface as jobs
Expand All @@ -32,8 +29,7 @@ def main():
SQLite instructions for the Flux Accounting Database.
"""
)
subparsers = parser.add_subparsers(help="sub-command help",
dest="subcommand")
subparsers = parser.add_subparsers(help="sub-command help", dest="subcommand")
subparsers.required = True

parser.add_argument(
Expand Down Expand Up @@ -64,7 +60,7 @@ def main():
"--admin-level", help="admin level", default=1, metavar="ADMIN_LEVEL",
)
subparser_add_user.add_argument(
"--account", help="account to charge jobs against", metavar="ACCOUNT",
"--bank", help="bank to charge jobs against", metavar="BANK",
)
subparser_add_user.add_argument(
"--parent-acct", help="parent account", default="", metavar="PARENT_ACCOUNT",
Expand Down Expand Up @@ -123,6 +119,19 @@ def main():
"create-db", help="create the flux-accounting database"
)
subparser_create_db.set_defaults(func="create_db")
subparser_create_db.add_argument(
"dbpath", help="specify location of database file", metavar=("DATABASE PATH")
)
subparser_create_db.add_argument(
"--priority-usage-reset-period",
help="the number of weeks at which usage information gets reset to 0",
metavar=("PRIORITY USAGE RESET PERIOD"),
)
subparser_create_db.add_argument(
"--priority-decay-half-life",
help="the contribution of historical usage in weeks on the composite usage value",
metavar=("PRIORITY DECAY HALF LIFE"),
)

subparser_add_bank = subparsers.add_parser("add-bank", help="add a new bank")
subparser_add_bank.set_defaults(func="add_bank")
Expand Down Expand Up @@ -172,7 +181,9 @@ def main():
# if we are creating the DB for the first time, we need
# to ONLY create the DB and then exit out successfully
if args.func == "create_db":
c.create_db(path)
c.create_db(
args.dbpath, args.priority_usage_reset_period, args.priority_decay_half_life
)
sys.exit(0)

# try to open database file; will exit with -1 if database file not found
Expand All @@ -183,6 +194,8 @@ def main():
db_uri = "file:" + path + "?mode=rw"
try:
conn = sqlite3.connect(db_uri, uri=True)
# set foreign keys constraint
conn.execute("PRAGMA foreign_keys = 1")
except sqlite3.OperationalError:
print(f"Unable to open database file: {db_uri}", file=sys.stderr)
sys.exit(1)
Expand All @@ -197,7 +210,7 @@ def main():
aclif.add_user(
conn,
args.username,
args.account,
args.bank,
args.admin_level,
args.shares,
args.max_jobs,
Expand Down
Loading

0 comments on commit f24b065

Please sign in to comment.