Skip to content

Commit

Permalink
Clear time is now displayed for bugged and aborted runs
Browse files Browse the repository at this point in the history
• For the aborted/bugged runs, the time may not be reliable, but it's better
  than manually determining the time for runs where only the logs were bugged.
• Constants have been separated into PTConstants and MiscConstants
• Version bump to 2.6.0
  • Loading branch information
Iterniam committed Oct 8, 2022
1 parent eaf4e93 commit 48f5be1
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 50 deletions.
4 changes: 2 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Profit-Taker Analyzer by ReVoltage#3425 #
# Rewritten by Iterniam#5829 #
# https://github.com/revoltage34/ptanalyzer #
# Requires Python 3.9 #
# Requires Python 3.10 #
#############################################
import socket
import traceback
Expand All @@ -17,7 +17,7 @@
from src.analyzer import Analyzer
from src.utils import color

VERSION = 'v2.5.2'
VERSION = 'v2.6.0'


def check_version():
Expand Down
132 changes: 87 additions & 45 deletions src/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from math import nan, isnan
from statistics import median
from time import sleep
from typing import Iterator, Callable
from typing import Iterator, Callable, Optional, Union

from sty import rs, fg

Expand All @@ -15,13 +15,7 @@
from src.utils import color, time_str, oxfordcomma


class Constants:
NICKNAME = 'Net [Info]: name: '
SQUAD_MEMBER = 'loadout loader finished.'
HEIST_START = 'jobId=/Lotus/Types/Gameplay/Venus/Jobs/Heists/HeistProfitTakerBountyFour'
HOST_MIGRATION = '"jobId" : "/Lotus/Types/Gameplay/Venus/Jobs/Heists/HeistProfitTakerBountyFour'
HEIST_ABORT = 'SetReturnToLobbyLevelArgs: '
ELEVATOR_EXIT = 'EidolonMP.lua: EIDOLONMP: Avatar left the zone'
class PTConstants:
SHIELD_SWITCH = 'SwitchShieldVulnerability'
SHIELD_PHASE_ENDINGS = {1: 'GiveItem Queuing resource load for Transmission: '
'/Lotus/Sounds/Dialog/FortunaOrbHeist/Business/DBntyFourInterPrTk0920TheBusiness',
Expand All @@ -41,6 +35,17 @@ class Constants:
FINAL_PHASE = 4


class MiscConstants:
NICKNAME = 'Net [Info]: name: '
SQUAD_MEMBER = 'loadout loader finished.'
HEIST_START = 'jobId=/Lotus/Types/Gameplay/Venus/Jobs/Heists/HeistProfitTakerBountyFour'
HOST_MIGRATION = '"jobId" : "/Lotus/Types/Gameplay/Venus/Jobs/Heists/HeistProfitTakerBountyFour'
HEIST_ABORT = 'SetReturnToLobbyLevelArgs: '
ELEVATOR_EXIT = 'EidolonMP.lua: EIDOLONMP: Avatar left the zone'
BACK_TO_TOWN = 'EidolonMP.lua: EIDOLONMP: TryTownTransition'
ABORT_MISSION = 'GameRulesImpl - changing state from SS_STARTED to SS_ENDING'


class RelRun:

def __init__(self,
Expand Down Expand Up @@ -174,6 +179,7 @@ def __init__(self, run_nr: int):
self.body_kill: dict[int, float] = {} # phase -> kill-time
self.pylon_start: dict[int, float] = {} # phase -> start-time
self.pylon_end: dict[int, float] = {} # phase -> end-time
self.final_time: Optional[float] = None

def __str__(self):
return '\n'.join((f'{key}: {val}' for key, val in vars(self).items()))
Expand All @@ -191,7 +197,7 @@ def post_process(self) -> None:
try:
self.shield_phases[4].pop()
except IndexError:
raise BuggedRun(['No shields were recorded in phase 4.']) from None
raise BuggedRun(self, ['No shields were recorded in phase 4.']) from None

def check_run_integrity(self) -> None:
"""
Expand Down Expand Up @@ -239,7 +245,7 @@ def check_run_integrity(self) -> None:
failure_reasons.append(f'No pylon phase end time was recorded in phase {phase}.')

if failure_reasons:
raise BuggedRun(failure_reasons)
raise BuggedRun(self, failure_reasons)
# Else: return none implicitly

def to_rel(self) -> RelRun:
Expand Down Expand Up @@ -290,12 +296,20 @@ def to_rel(self) -> RelRun:
return RelRun(self.run_nr, self.nickname, self.squad_members, pt_found,
phase_durations, shield_phases, legs, body_dur, pylon_dur)

@property
def failed_run_duration_str(self):
if self.final_time is not None and self.heist_start is not None:
return f'{fg.cyan}If Profit-Taker was killed, the run likely lasted around ' \
f'{fg.li_cyan}{time_str(self.final_time - self.heist_start, "units")}.\n'
return ''


class Analyzer:

def __init__(self):
self.follow_mode = False
self.runs: list[RelRun] = []
self.runs: list[Union[RelRun, RunAbort, BuggedRun]] = []
self.proper_runs: list[RelRun] = []

def run(self):
filename = self.get_file()
Expand Down Expand Up @@ -352,22 +366,34 @@ def analyze_log(self, dropped_file: str):
require_heist_start = True
while True:
try:
self.runs.append(self.read_run(it, len(self.runs) + 1, require_heist_start).to_rel())
run = self.read_run(it, len(self.runs) + 1, require_heist_start).to_rel()
self.runs.append(run)
self.proper_runs.append(run)
require_heist_start = True
except RunAbort as abort:
self.runs.append(abort)
require_heist_start = abort.require_heist_start
except BuggedRun as buggedRun:
print(color(str(buggedRun), fg.li_red)) # Prints reasons why the run failed
self.runs.append(buggedRun)
require_heist_start = True
except LogEnd:
pass
if len(self.runs) > 0:
best_run = min(self.runs, key=lambda run_: run_.length)

# Determine the best run
if len(self.proper_runs) > 0:
best_run = min(self.proper_runs, key=lambda run_: run_.length)
best_run.best_run = True

# Display all runs
if len(self.runs) > 0:
for run in self.runs:
run.pretty_print()
if isinstance(run, RelRun):
run.pretty_print()
else: # Aborted or bugged run, just print the exception
print(run)

self.print_summary()
if len(self.proper_runs) > 0:
self.print_summary()
else:
print(f'{fg.white}No valid Profit-Taker runs found.\n'
f'Note that you have to be host throughout the entire run for it to show up as a valid run.')
Expand All @@ -383,6 +409,7 @@ def follow_log(self, filename: str):
try:
run = self.read_run(it, len(self.runs) + 1, require_heist_start).to_rel()
self.runs.append(run)
self.proper_runs.append(run)
require_heist_start = True

if run.length < best_time:
Expand All @@ -391,9 +418,12 @@ def follow_log(self, filename: str):
run.pretty_print()
self.print_summary()
except RunAbort as abort:
print(abort)
self.runs.append(abort)
require_heist_start = abort.require_heist_start
except BuggedRun as buggedRun:
print(color(str(buggedRun), fg.li_red)) # Prints reasons why the run failed
print(buggedRun) # Print reasons why the run failed
self.runs.append(buggedRun)
require_heist_start = True

def read_run(self, log: Iterator[str], run_nr: int, require_heist_start=False) -> AbsRun:
Expand All @@ -409,7 +439,7 @@ def read_run(self, log: Iterator[str], run_nr: int, require_heist_start=False) -
"""
# Find heist load.
if require_heist_start: # Heist load is not required if the previous abort signifies the start of a new mission
Analyzer.skip_until_one_of(log, [lambda line: Constants.HEIST_START in line])
Analyzer.skip_until_one_of(log, [lambda line: MiscConstants.HEIST_START in line])

run = AbsRun(run_nr)

Expand All @@ -425,58 +455,70 @@ def register_phase(self, log: Iterator[str], run: AbsRun, phase: int) -> None:
"""
kill_sequence = 0
while True: # match exists for phases 1-3, kill_sequence for phase 4.
pt_line_match = True
try:
line = next(log)
except StopIteration:
raise LogEnd()

if Constants.SHIELD_SWITCH in line: # Shield switch
# Check for PT specific messages
if PTConstants.SHIELD_SWITCH in line: # Shield switch
# Shield_phase '3.5' is for when shields swap during the pylon phase in phase 3.
shield_phase = 3.5 if phase == 3 and 3 in run.pylon_start else phase
run.shield_phases[shield_phase].append(Analyzer.shield_from_line(line))

# The first shield can help determine whether to abort.
if self.follow_mode and len(run.shield_phases[1]) == 1:
print(f'{fg.white}First shield: {fg.li_cyan}{run.shield_phases[phase][0][0]}')
elif any(True for shield_end in Constants.SHIELD_PHASE_ENDINGS.values() if shield_end in line):
elif any(True for shield_end in PTConstants.SHIELD_PHASE_ENDINGS.values() if shield_end in line):
run.shield_phase_endings[phase] = Analyzer.time_from_line(line)
elif Constants.LEG_KILL in line: # Leg kill
elif PTConstants.LEG_KILL in line: # Leg kill
run.legs[phase].append(Analyzer.time_from_line(line))
elif Constants.BODY_VULNERABLE in line: # Body vulnerable / phase 4 end
elif PTConstants.BODY_VULNERABLE in line: # Body vulnerable / phase 4 end
if kill_sequence == 0: # Only register the first invuln message on each phase
run.body_vuln[phase] = Analyzer.time_from_line(line)
kill_sequence += 1 # 3x BODY_VULNERABLE in one phase means PT dies.
if kill_sequence == 3: # PT dies.
run.body_kill[phase] = Analyzer.time_from_line(line)
return
elif Constants.STATE_CHANGE in line: # Generic state change
elif PTConstants.STATE_CHANGE in line: # Generic state change
# Generic match on state change to find things we can't reliably find otherwise
new_state = int(line.split()[8])
# State 3, 5 and 6 are body kills for phases 1, 2 and 3.
if new_state in [3, 5, 6]:
run.body_kill[phase] = Analyzer.time_from_line(line)
elif Constants.PYLONS_LAUNCHED in line: # Pylons launched
elif PTConstants.PYLONS_LAUNCHED in line: # Pylons launched
run.pylon_start[phase] = Analyzer.time_from_line(line)
elif Constants.PHASE_1_START in line: # Profit-Taker found
elif PTConstants.PHASE_1_START in line: # Profit-Taker found
run.pt_found = Analyzer.time_from_line(line)
elif Constants.PHASE_ENDS[phase] in line and phase != Constants.FINAL_PHASE: # Phase endings minus phase 4
elif PTConstants.PHASE_ENDS[phase] in line and phase != PTConstants.FINAL_PHASE: # Phase endings minus p4
if phase in [1, 3]: # Ignore phase 2 as it already matches body_kill.
run.pylon_end[phase] = Analyzer.time_from_line(line)
return
elif Constants.NICKNAME in line: # Nickname
else:
pt_line_match = False

if pt_line_match:
run.final_time = Analyzer.time_from_line(line)
continue

# Non-pt specific messages
if MiscConstants.NICKNAME in line: # Nickname
# Note: Replacing "î\x80\x80" has to be done since the Veilbreaker update botched names
run.nickname = line.replace(',', '').replace(\x80\x80", "").split()[-2]
elif Constants.ELEVATOR_EXIT in line: # Elevator exit (start of speedrun timing)
if not run.heist_start: # Only use the first time that the zone is left aka heist is started.
run.heist_start = Analyzer.time_from_line(line)
elif Constants.HEIST_START in line: # New heist start found
raise RunAbort(require_heist_start=False)
elif Constants.HOST_MIGRATION in line: # Host migration
raise RunAbort(require_heist_start=True)
elif Constants.SQUAD_MEMBER in line: # Squad member
elif MiscConstants.SQUAD_MEMBER in line: # Squad member
# Note: Replacing "î\x80\x80" has to be done since the Veilbreaker update botched names
# Note: The characters might represent the player's platform
run.squad_members.add(line.replace(\x80\x80", "").split()[-4])
elif MiscConstants.ELEVATOR_EXIT in line: # Elevator exit (start of speedrun timing)
if not run.heist_start: # Only use the first time that the zone is left aka heist is started.
run.heist_start = Analyzer.time_from_line(line)
elif MiscConstants.HEIST_START in line: # New heist start found
raise RunAbort(run, require_heist_start=False)
elif MiscConstants.BACK_TO_TOWN in line or MiscConstants.ABORT_MISSION in line:
raise RunAbort(run, require_heist_start=True)
elif MiscConstants.HOST_MIGRATION in line: # Host migration
raise RunAbort(run, require_heist_start=True)

@staticmethod
def time_from_line(line: str) -> float:
Expand All @@ -497,22 +539,22 @@ def skip_until_one_of(log: Iterator[str], conditions: list[Callable[[str], bool]
raise LogEnd()

def print_summary(self):
assert len(self.runs) > 0
best_run = min(self.runs, key=lambda run: run.length)
assert len(self.proper_runs) > 0
best_run = min(self.proper_runs, key=lambda run: run.length)
print(f'{fg.li_green}Best run:\t\t'
f'{fg.li_cyan}{time_str(best_run.length, "units")} '
f'{fg.cyan}(Run #{best_run.run_nr})')
print(f'{fg.li_green}Median time:\t\t'
f'{fg.li_cyan}{time_str(median(run.length for run in self.runs), "units")}')
f'{fg.li_cyan}{time_str(median(run.length for run in self.proper_runs), "units")}')
print(f'{fg.li_green}Median fight duration:\t'
f'{fg.li_cyan}{time_str(median(run.length - run.pt_found for run in self.runs), "units")}\n')
f'{fg.li_cyan}{time_str(median(run.length - run.pt_found for run in self.proper_runs), "units")}\n')
print(f'{fg.li_green}Median sum of parts {fg.li_cyan}'
f'{time_str(median(run.sum_of_parts for run in self.runs), "brackets")}')
f'{time_str(median(run.sum_of_parts for run in self.proper_runs), "brackets")}')
print(f'{fg.white} Median shield change:\t{fg.li_green}'
f'{median(run.shield_sum for run in self.runs):7.3f}s')
f'{median(run.shield_sum for run in self.proper_runs):7.3f}s')
print(f'{fg.white} Median leg break:\t{fg.li_green}'
f'{median(run.leg_sum for run in self.runs):7.3f}s')
f'{median(run.leg_sum for run in self.proper_runs):7.3f}s')
print(f'{fg.white} Median body killed:\t{fg.li_green}'
f'{median(run.body_sum for run in self.runs):7.3f}s')
f'{median(run.body_sum for run in self.proper_runs):7.3f}s')
print(f'{fg.white} Median pylons:\t\t{fg.li_green}'
f'{median(run.pylon_sum for run in self.runs):7.3f}s')
f'{median(run.pylon_sum for run in self.proper_runs):7.3f}s')
19 changes: 17 additions & 2 deletions src/exceptions/bugged_run.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from sty import fg

from src.utils import time_str

if TYPE_CHECKING:
from src.analyzer import AbsRun


class BuggedRun(RuntimeError):
"""An exception indicating that a run has bugged out - it does not have
enough information to convert to a relative run.
If require_heist_start is set to True, the analyzer should look for a 'job start' line.
Otherwise, the analyzer can assume that a new run started that aborted the old run."""
def __init__(self, reasons: list[str]):
def __init__(self, run: AbsRun, reasons: list[str]):
self.run = run
self.reasons = reasons

def __str__(self):
reason_str = '\n'.join(self.reasons)
return f'Bugged run detected, no stats will be displayed. Bugs found:\n{reason_str}\n'
return f'{fg.li_red}Profit-Taker Run #{self.run.run_nr} was bugged, no stats will be displayed. ' \
f'Bugs found:\n{reason_str}\n' \
f'{self.run.failed_run_duration_str}'
19 changes: 18 additions & 1 deletion src/exceptions/run_abort.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from sty import fg

from src.utils import time_str

if TYPE_CHECKING:
from src.analyzer import AbsRun


class RunAbort(Exception):
"""An exception indicating that a run has aborted.
If require_heist_start is set to True, the analyzer should look for a 'job start' line.
Otherwise, the analyzer can assume that a new run started that aborted the old run."""
def __init__(self, *, require_heist_start: bool):
def __init__(self, run: AbsRun, *, require_heist_start: bool):
self.run = run
self.require_heist_start = require_heist_start

def __str__(self):
return f'{fg.cyan}Profit-Taker Run #{self.run.run_nr} was aborted or had bugged logs.\n' \
f'{self.run.failed_run_duration_str}'

0 comments on commit 48f5be1

Please sign in to comment.