Skip to content

Commit

Permalink
Merge pull request #413 from CPJKU/kern_fixes
Browse files Browse the repository at this point in the history
Updates on kern support
  • Loading branch information
fosfrancesco authored Jan 20, 2025
2 parents b8b1322 + f2cd101 commit ff6eb30
Showing 1 changed file with 141 additions and 67 deletions.
208 changes: 141 additions & 67 deletions partitura/io/importkern.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
}

KERN_DURS = {
"3%2": {"type": "whole", "dots": 0, "actual_notes": 3, "normal_notes": 2},
"2%3": {"type": "whole", "dots": 1},
"000": {"type": "maxima"},
"00": {"type": "long"},
"0": {"type": "breve"},
Expand Down Expand Up @@ -153,7 +151,6 @@ def _handle_kern_with_spine_splitting(kern_path: PathLike):
data = []
file = org_file.tolist()
file = [line.split("\t") for line in file if not line.startswith("!")]
continue_parsing = True
for i in range(len(spline_types)):
# Parse by voice
d, voice_indices, num_voices = parse_by_voice(file, dtype=dtype)
Expand All @@ -178,21 +175,55 @@ def _handle_kern_with_spine_splitting(kern_path: PathLike):


def element_parsing(
part: spt.Part, elements: np.array, total_duration_values: np.array, same_part: bool
part: spt.Part, elements: np.array, total_duration_values: np.array, same_part: bool,
doc_lines: np.array, line2pos: dict
):
"""
Parse and add musical elements to a part.
Parameters
----------
part : spt.Part
The partitura part to which elements will be added.
elements : np.array
Array of musical elements to be parsed and added.
total_duration_values : np.array
Array of total duration values for each element.
same_part : bool
Flag indicating if elements are being added to the same part.
doc_lines : np.array
Array of document lines corresponding to the elements.
line2pos : dict
Dictionary mapping document lines to their part start positions.
Returns
-------
line2pos : dict
Updated dictionary mapping document lines to their positions.
This is used to handle cases with split spines.
"""
divs_pq = part._quarter_durations[0]
line2pos = line2pos if same_part else {}
current_tl_pos = 0
editorial = False
measure_mapping = {m.number: m.start.t for m in part.iter_all(spt.Measure)}

for i in range(elements.shape[0]):
element = elements[i]
if i < len(doc_lines):
current_tl_pos = line2pos.get(doc_lines[i], current_tl_pos)

# Handle editorial elements
if isinstance(element, KernElement):
if element.editorial_start:
editorial = True
if element.editorial_end:
editorial = False

if element is None or editorial:
continue

# Handle generic notes
if isinstance(element, spt.GenericNote):
if total_duration_values[i] == 0:
duration_divs = symbolic_to_numeric_duration(
Expand All @@ -203,33 +234,43 @@ def element_parsing(
duration_divs = ceil(quarter_duration * divs_pq)
el_end = current_tl_pos + duration_divs
part.add(element, start=current_tl_pos, end=el_end)
line2pos[doc_lines[i]] = current_tl_pos
current_tl_pos = el_end

# Handle chords
elif isinstance(element, tuple):
# Chord
quarter_duration = 4 / total_duration_values[i]
duration_divs = ceil(quarter_duration * divs_pq)
el_end = current_tl_pos + duration_divs
for note in element[1]:
part.add(note, start=current_tl_pos, end=el_end)
line2pos[doc_lines[i]] = current_tl_pos
current_tl_pos = el_end

# Handle slurs
elif isinstance(element, spt.Slur):
start_sl = element.start_note.start.t
end_sl = element.end_note.start.t
part.add(element, start=start_sl, end=end_sl)

# Handle other elements
else:
# Do not repeat structural elements if they are being added to the same part.
if not same_part:
part.add(element, start=current_tl_pos)
line2pos[doc_lines[i]] = current_tl_pos
else:
if isinstance(element, spt.Measure):
current_tl_pos = measure_mapping[element.number]

return line2pos


# functions to initialize the kern parser
def load_kern(
filename: PathLike,
force_note_ids: Optional[Union[bool, str]] = None,
force_same_part: Optional[bool] = False,
) -> spt.Score:
"""
Parses an KERN file from path to Part.
Expand All @@ -246,104 +287,126 @@ def load_kern(
The score object containing the parts.
"""
try:
# This version of the parser is faster but does not support spine splitting.
# Attempt to load the file using a faster parser that does not support spine splitting
file = np.loadtxt(
filename, dtype="U", delimiter="\t", comments="!!", encoding="cp437"
)
parsing_idxs = np.arange(file.shape[0])
# Decide Parts

except ValueError:
# This version of the parser supports spine splitting but is slower.
# Fallback to a slower parser that supports spine splitting
file, parsing_idxs = _handle_kern_with_spine_splitting(filename)

partlist = []
# Get Main Number of parts and Spline Types
# Get the main number of parts and spline types
spline_types = file[0]

# Find parsable parts if they start with "**kern" or "**notes"
# Identify parsable parts that start with "**kern" or "**notes"
note_parts = np.char.startswith(spline_types, "**kern") | np.char.startswith(
spline_types, "**notes"
)
# Get Splines
# Extract splines for the identified parts
splines = file[1:].T[note_parts]
prev_staff = 1

has_instrument = np.char.startswith(splines, "*I")
# if all parts have the same instrument, then they are the same part.
# Determine if all parts have the same instrument
p_same_part = (
np.all(splines[has_instrument] == splines[has_instrument][0], axis=0)
if np.any(has_instrument)
else False
)
total_durations_list = list()
elements_list = list()
part_assignments = list()
copy_partlist = list()
# Determine if all parts have the same *part
has_part_global = np.char.startswith(splines, "*part")
p_same_part = (
np.all(splines[has_part_global] == splines[has_part_global][0], axis=0)
if np.any(has_part_global)
else p_same_part
)
# Assign all splines to the same part if necessary
if p_same_part or force_same_part:
parsing_idxs[:] = 0

# Initialize lists to store parsed data
total_durations_list = []
elements_list = []
part_assignments = []
copy_partlist = []
doc_lines_per_spline = []
# Initialize staff and voice numbers
prev_staff = 1
pvoice = 1

# Iterate over each spline (musical data stream)
for j, spline in enumerate(splines):
# Create a new SplineParser for the current spline
parser = SplineParser(
size=spline.shape[-1],
id="P{}".format(parsing_idxs[j]) if not p_same_part else "P{}".format(j),
id="P{}".format(parsing_idxs[j]),
staff=prev_staff,
voice=pvoice,
)
same_part = False
if parser.id in [p.id for p in copy_partlist]:
same_part = True
# Flag to indicate if the part is the same as a previous one
same_part = parser.id in [p.id for p in copy_partlist]
if same_part:
# If the part already exists, add to the previous part
warnings.warn(
"Part {} already exists. Adding to previous Part.".format(parser.id)
)
part = [p for p in copy_partlist if p.id == parser.id][0]
part = next(p for p in copy_partlist if p.id == parser.id)
# Check for staff information in the spline
has_staff = np.char.startswith(spline, "*staff")
staff = int(spline[has_staff][0][6:]) if np.count_nonzero(has_staff) else 1
staff = int(spline[has_staff][0][6:]) if np.count_nonzero(has_staff) else prev_staff
prev_staff = staff
if parser.staff != staff:
parser.staff = staff
else:
parser.voice += 1
elements = parser.parse(spline)
unique_durs = np.unique(parser.total_duration_values).astype(int)
divs_pq = np.lcm.reduce(unique_durs)
divs_pq = divs_pq if divs_pq > 4 else 4
# compare divs_pq to the divs_pq of the part
divs_pq = np.lcm.reduce([divs_pq, part._quarter_durations[0]])
part.set_quarter_duration(0, divs_pq)
# Update the voice number
parser.voice = pvoice + 1
else:
# If the part does not exist, create a new part
has_staff = np.char.startswith(spline, "*staff")
staff = int(spline[has_staff][0][6:]) if np.count_nonzero(has_staff) else 1
# Correction for currating multiple staffs.
if parser.staff != staff:
parser.staff = staff
prev_staff = staff
elements = parser.parse(spline)
# Routine to filter out non integer durations
unique_durs = np.unique(parser.total_duration_values)
# Remove all infinite values
unique_durs = unique_durs[np.isfinite(unique_durs)]
d_mul = 2
while not np.all(np.isclose(unique_durs % 1, 0.0)):
unique_durs = unique_durs * d_mul
d_mul += 1
unique_durs = unique_durs.astype(int)

divs_pq = np.lcm.reduce(unique_durs)
divs_pq = divs_pq if divs_pq > 4 else 4
# Initialize Part
parser.staff = staff
prev_staff = staff
parser.voice = 1
pvoice = 1

# Parse the spline into musical elements
elements, lines = parser.parse(spline)
# Calculate unique durations and ensure they are integers
unique_durs = np.unique(parser.total_duration_values)
unique_durs = unique_durs[np.isfinite(unique_durs)]
d_mul = 2
while not np.all(np.isclose(unique_durs % 1, 0.0)):
unique_durs *= d_mul
d_mul += 1
unique_durs = unique_durs.astype(int)
divs_pq = np.lcm.reduce(unique_durs)
divs_pq = max(divs_pq, 4)

if same_part:
divs_pq = np.lcm.reduce([divs_pq, part._quarter_durations[0]])
part.set_quarter_duration(0, divs_pq)
pvoice = parser.voice
else:
part = spt.Part(
id=parser.id, quarter_duration=divs_pq, part_name=parser.name
)

part_assignments.append(same_part)
doc_lines_per_spline.append(lines)
total_durations_list.append(parser.total_duration_values)
elements_list.append(elements)
copy_partlist.append(part)

# Currate parts to the same divs per quarter
# Ensure all parts have the same divs per quarter
divs_pq = np.lcm.reduce([p._quarter_durations[0] for p in copy_partlist])
for part in copy_partlist:
part.set_quarter_duration(0, divs_pq)

for part, elements, total_duration_values, same_part in zip(
copy_partlist, elements_list, total_durations_list, part_assignments
line2pos = {}
for part, elements, total_duration_values, same_part, doc_lines in zip(
copy_partlist, elements_list, total_durations_list, part_assignments, doc_lines_per_spline
):
element_parsing(part, elements, total_duration_values, same_part)
line2pos = element_parsing(part, elements, total_duration_values, same_part, doc_lines, line2pos)

for i, part in enumerate(copy_partlist):
if part_assignments[i]:
Expand All @@ -365,7 +428,7 @@ def load_kern(
)

doc_name = get_document_name(filename)
# inversing the partlist results to correct part order and visualization for exporting musicxml files
# Reverse the partlist to correct part order and visualization for exporting musicxml files
score = spt.Score(partlist=partlist[::-1], id=doc_name)
return score

Expand All @@ -380,6 +443,7 @@ def __init__(self, id="P1", staff=1, voice=1, size=1, name=""):
self.alters = []
self.size = size
self.total_parsed_elements = 0
self.measure_enum = 1
self.tie_prev = None
self.tie_next = None
self.slurs_start = []
Expand All @@ -399,16 +463,12 @@ def parse(self, spline: np.array):
elements: np.array
The parsed elements of the spline line.
"""
lines = np.arange(len(spline))
# Remove "-" lines
spline = spline[spline != "-"]
# Remove "." lines
spline = spline[spline != "."]
# Remove Empty lines
spline = spline[spline != ""]
# Remove None lines
spline = spline[spline != None]
# Remove lines that start with "!"
spline = spline[np.char.startswith(spline, "!") == False]
mask = (spline == "-") | (spline == ".") | (spline == "") | (spline is None) | (np.char.startswith(spline, "!") == True)
mask = ~mask
spline = spline[mask]
lines = lines[mask]
# Empty Numpy array with objects
elements = np.empty(len(spline), dtype=object)
self.total_duration_values = np.ones(len(spline))
Expand Down Expand Up @@ -475,7 +535,7 @@ def parse(self, spline: np.array):
)
)

return elements
return elements, lines

def meta_tandem_line(self, line: str):
"""
Expand Down Expand Up @@ -573,6 +633,9 @@ def process_clef_line(self, line: str):
clef_line = 4
elif clef == "C":
clef_line = 3
elif clef == "X":
clef = "percussion"
clef_line = 1
else:
raise ValueError("Unrecognized clef line: {}".format(line))
else:
Expand Down Expand Up @@ -641,6 +704,12 @@ def _process_kern_duration(self, duration: str, is_grace=False):
dur = duration.replace(".", "")
if dur in KERN_DURS.keys():
symbolic_duration = copy.deepcopy(KERN_DURS[dur])
# support for extended kern durations
elif "%" in dur:
dur = dur.split("%")
nom, den = int(dur[0]), int(dur[1])
symbolic_duration = {"type": "whole", "dots": 0, "actual_notes": nom, "normal_notes": den}
dur = nom * den
else:
dur = float(dur)
key_loolup = [2**i for i in range(0, 9)]
Expand Down Expand Up @@ -676,6 +745,9 @@ def process_symbol(self, note: spt.Note, symbols: list):
symbols: list
List of symbols to process.
"""
# return if list is empty
if symbols == []:
return
if "[" in symbols:
self.tie_prev[self.total_parsed_elements] = True
# pop symbol and call again
Expand Down Expand Up @@ -790,7 +862,9 @@ def meta_barline_line(self, line: str):
number_index = line.index(number[0]) if number else line.index("=")
closing_repeat = re.findall(r"[:|]", line[:number_index])
opening_repeat = re.findall(r"[|:]", line[number_index:])
return spt.Measure(number=int(number[0]) if number else None)
m = spt.Measure(number=self.measure_enum, name=int(number[0]) if number else None)
self.measure_enum += 1
return m

def meta_chord_line(self, line: str):
"""
Expand Down

0 comments on commit ff6eb30

Please sign in to comment.