Skip to content

Commit

Permalink
support MS Excel tabulator status file (#2018)
Browse files Browse the repository at this point in the history
* support MS Excel tabulator status file

* PR feedback

* preprocess rows and favor list comprehensions over manual iterator

* use dict comprehension for tabulator_id_to_name
  • Loading branch information
kshen0 authored Oct 29, 2024
1 parent 4615ef9 commit 1c0edfc
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 18 deletions.
92 changes: 84 additions & 8 deletions server/api/batch_inventory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
import shutil
import tempfile
from collections import defaultdict
Expand All @@ -12,6 +13,8 @@
from werkzeug.exceptions import BadRequest, Conflict
from sqlalchemy.orm import Session

from server.util.string import strip_optional_string


from ..database import db_session, engine
from . import api
Expand Down Expand Up @@ -50,6 +53,10 @@
from ..activity_log.activity_log import UploadFile, activity_base, record_activity
from ..util.get_json import safe_get_json_dict


TABULATOR_ID = "Tabulator Id"
NAME = "Name"

# (tabulator_id, batch_id)
BatchKey = Tuple[str, str]

Expand Down Expand Up @@ -428,8 +435,8 @@ def process():


TABULATOR_STATUS_PARSE_ERROR = (
"We could not parse this file. Please make sure you upload the plain XML version of the tabulator status report."
' The file name should end in ".xml" and should not contain the words "To Excel".'
"We could not parse this file. Please make sure you upload either the plain XML version or Excel version of the tabulator status report."
' The file name should end in ".xml".'
)


Expand All @@ -442,19 +449,88 @@ def process_batch_inventory_tabulator_status_file(
jurisdiction = Jurisdiction.query.get(jurisdiction_id)
batch_inventory_data = BatchInventoryData.query.get(jurisdiction_id)

def get_tabulator_id_to_name_dict_for_excel_file(
cvr_xml: ElementTree.ElementTree,
):
namespaces = {"ss": "urn:schemas-microsoft-com:office:spreadsheet"}
# List of all rows in the table
rows = cvr_xml.findall(
".//ss:Worksheet[@ss:Name='Tabulator Status']/ss:Table/ss:Row",
namespaces,
)
# List of all rows and text content of each cell in the row. eg.
# [ ...
# ["Tabulator Id", "Name", "Load Status", "Total Ballots Cast"],
# ["TABULATOR1", "Tabulator One", "1", "123"],
# ["TABULATOR2", "Tabulator Two", "1", "456"],
# ...
# ]
rows_with_cell_text = [
[
strip_optional_string(data_element.text)
for data_element in row.findall(
"ss:Cell/ss:Data[@ss:Type='String']", namespaces
)
]
for row in rows
]

# Get the column headers row so we know at which indices to access "Tabulator Id" and "Name" later
column_header_row_index = next(
(
i
for i, row_cells in enumerate(rows_with_cell_text)
if TABULATOR_ID in row_cells
),
-1,
)

# Validate column header row was found
if column_header_row_index == -1:
raise UserError(TABULATOR_STATUS_PARSE_ERROR)

# Validate we have at least 1 row of tabulator data after the column headers
if column_header_row_index == len(rows_with_cell_text) - 1:
raise UserError(TABULATOR_STATUS_PARSE_ERROR)

column_headers_row = rows_with_cell_text[column_header_row_index]

# Get the position of "Tabulator Id" and "Name" values in the list of cells for a single row
tabulator_id_index = column_headers_row.index(TABULATOR_ID)
tabulator_name_index = column_headers_row.index(NAME)

return {
tabulator_data_row[tabulator_id_index]: tabulator_data_row[
tabulator_name_index
]
for tabulator_data_row in rows_with_cell_text[column_header_row_index + 1 :]
}

def get_tabulator_id_to_name_dict_for_plain_xml_file(
cvr_xml: ElementTree.ElementTree,
) -> Dict[Optional[str], Optional[str]]:
tabulators = cvr_xml.findall("tabulators/tb")
if len(tabulators) == 0:
raise UserError(TABULATOR_STATUS_PARSE_ERROR)

return {tabulator.get("tid"): tabulator.get("name") for tabulator in tabulators}

def process():
file = retrieve_file(batch_inventory_data.tabulator_status_file.storage_path)
try:
cvr_xml = ElementTree.parse(file)
except Exception as error:
raise UserError(TABULATOR_STATUS_PARSE_ERROR) from error

tabulators = cvr_xml.findall("tabulators/tb")
if len(tabulators) == 0:
raise UserError(TABULATOR_STATUS_PARSE_ERROR)
tabulator_id_to_name = {
tabulator.get("tid"): tabulator.get("name") for tabulator in tabulators
}
root = cvr_xml.getroot()
is_ms_excel_file = re.match(
r"\{urn:schemas-microsoft-com:office:spreadsheet\}", root.tag
)
tabulator_id_to_name = (
get_tabulator_id_to_name_dict_for_excel_file(cvr_xml)
if is_ms_excel_file
else get_tabulator_id_to_name_dict_for_plain_xml_file(cvr_xml)
)

ballot_count_by_batch = items_list_to_dict(
batch_inventory_data.election_results["ballot_count_by_batch"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@

snapshots = Snapshot()

snapshots[
"test_batch_inventory_excel_tabulator_status_file 1"
] = """Batch Inventory Worksheet\r
\r
Section 1: Check Ballot Groups\r
1. Compare the CVR Ballot Count for each ballot group to your voter check-in data.\r
2. Ensure that the numbers reconcile. If there is a large discrepancy contact your SOS liaison.\r
\r
Ballot Group,CVR Ballot Count,Checked? (Type Yes/No)\r
Election Day,13,\r
Mail,2,\r
\r
Section 2: Check Batches\r
1. Locate each batch in storage.\r
2. Confirm the CVR Ballot Count is correct using associated documentation. Do NOT count the ballots. If there is a large discrepancy contact your SOS liaison.\r
3. Make sure there are no batches missing from this worksheet.\r
\r
Batch,CVR Ballot Count,Checked? (Type Yes/No)\r
Tabulator 1 - BATCH1,3,\r
Tabulator 1 - BATCH2,3,\r
Tabulator 2 - BATCH1,3,\r
Tabulator 2 - BATCH2,6,\r
"""

snapshots[
"test_batch_inventory_happy_path 1"
] = """Batch Inventory Worksheet\r
Expand Down
78 changes: 68 additions & 10 deletions server/tests/batch_comparison/test_batch_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,11 +991,12 @@ def test_batch_inventory_missing_data_multi_contest_batch_comparison(
assert_ok(rv)


def test_batch_inventory_wrong_tabulator_status_file(
def test_batch_inventory_excel_tabulator_status_file(
client: FlaskClient,
election_id: str,
jurisdiction_ids: List[str],
contest_id: str, # pylint: disable=unused-argument
snapshot,
):
set_logged_in_user(
client, UserType.JURISDICTION_ADMIN, default_ja_email(election_id)
Expand Down Expand Up @@ -1116,16 +1117,30 @@ def test_batch_inventory_wrong_tabulator_status_file(
</Row>
<Row>
<Cell>
<Data ss:Type="String">10</Data>
<Data ss:Type="String">TABULATOR1</Data>
</Cell>
<Cell>
<Data ss:Type="String">ED-ICP 1</Data>
<Data ss:Type="String">Tabulator 1</Data>
</Cell>
<Cell>
<Data ss:Type="Number">1</Data>
</Cell>
<Cell ss:StyleID="Number">
<Data ss:Type="Number">538</Data>
<Data ss:Type="Number">123</Data>
</Cell>
</Row>
<Row>
<Cell>
<Data ss:Type="String">TABULATOR2</Data>
</Cell>
<Cell>
<Data ss:Type="String">Tabulator 2</Data>
</Cell>
<Cell>
<Data ss:Type="Number">1</Data>
</Cell>
<Cell ss:StyleID="Number">
<Data ss:Type="Number">456</Data>
</Cell>
</Row>
</Table>
Expand All @@ -1141,12 +1156,55 @@ def test_batch_inventory_wrong_tabulator_status_file(
rv = client.get(
f"/api/election/{election_id}/jurisdiction/{jurisdiction_ids[0]}/batch-inventory/tabulator-status"
)
tabulator_status = json.loads(rv.data)
assert tabulator_status["processing"]["status"] == ProcessingStatus.ERRORED
assert (
tabulator_status["processing"]["error"]
== 'We could not parse this file. Please make sure you upload the plain XML version of the tabulator status report. The file name should end in ".xml" and should not contain the words "To Excel".'
compare_json(
json.loads(rv.data),
{
"file": {"name": "tabulator-status.xml", "uploadedAt": assert_is_date},
"processing": {
"status": ProcessingStatus.PROCESSED,
"startedAt": assert_is_date,
"completedAt": assert_is_date,
"error": None,
},
},
)

# Download worksheet
rv = client.get(
f"/api/election/{election_id}/jurisdiction/{jurisdiction_ids[0]}/batch-inventory/worksheet"
)
snapshot.assert_match(rv.data.decode("utf-8"))


def test_batch_inventory_wrong_tabulator_status_file(
client: FlaskClient,
election_id: str,
jurisdiction_ids: List[str],
contest_id: str, # pylint: disable=unused-argument
):
set_logged_in_user(
client, UserType.JURISDICTION_ADMIN, default_ja_email(election_id)
)

# Set system type
rv = put_json(
client,
f"/api/election/{election_id}/jurisdiction/{jurisdiction_ids[0]}/batch-inventory/system-type",
{"systemType": CvrFileType.DOMINION},
)
assert_ok(rv)

# Upload CVR file
rv = client.put(
f"/api/election/{election_id}/jurisdiction/{jurisdiction_ids[0]}/batch-inventory/cvr",
data={
"cvr": (
io.BytesIO(TEST_CVR.encode()),
"cvrs.csv",
),
},
)
assert_ok(rv)

# Upload tabulator status HTML version
rv = client.put(
Expand Down Expand Up @@ -1248,7 +1306,7 @@ def test_batch_inventory_wrong_tabulator_status_file(
assert tabulator_status["processing"]["status"] == ProcessingStatus.ERRORED
assert (
tabulator_status["processing"]["error"]
== 'We could not parse this file. Please make sure you upload the plain XML version of the tabulator status report. The file name should end in ".xml" and should not contain the words "To Excel".'
== 'We could not parse this file. Please make sure you upload either the plain XML version or Excel version of the tabulator status report. The file name should end in ".xml".'
)


Expand Down
8 changes: 8 additions & 0 deletions server/util/string.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from typing import Optional


# Formats a number using the appropriate singular or plural form of a noun.
def format_count(count: int, singular: str, plural: str) -> str:
return f"{count:,} {singular if count == 1 else plural}"


# Returns `value.strip()` or None if `value` is None
def strip_optional_string(value: Optional[str]) -> str:
return (value or "").strip()

0 comments on commit 1c0edfc

Please sign in to comment.