Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data loss issue by ensuring proper locking and clearing of changes #314

Merged
merged 1 commit into from
Jan 10, 2025

Conversation

koseki2580
Copy link
Contributor

Summary

This pull request addresses a data loss issue by optimizing the timing of lock management. Specifically, it ensures that data retrieval and clearing operations are performed within the same lock scope, preventing data loss caused by concurrent modifications.

Changes

Modified lock management to ensure data is cleared within the same lock scope immediately after retrieval.
Previously, the lock was released after retrieving data, allowing other threads to modify the data before it was cleared. This caused data loss.

Problem Description

The following code retrieves data from changes after acquiring a lock, releases the lock, and then reacquires the lock to clear the data. This creates a window where other threads can add new events to changes, resulting in data loss.

let py_changes = slf
          .borrow()
          .changes
          .lock()
          .unwrap()
          .to_owned()
          .into_pyobject(py)?
          .into_any()
          .unbind();
slf.borrow().clear();
impl RustNotify {
    fn clear(&self) {
        self.changes.lock().unwrap().clear();
    }
}

As a result, any new events added to changes during this gap are not retrieved correctly, leading to data loss.

Reproduction Steps

Add Logging

Insert logging statements at the following points to visualize the issue:

  • Around line 178 in src/lib.rs.
  • Before and after the py_changes assignment at line 331.
  • Before and after the slf.borrow().clear(); call at line 340.
// use custom log
use chrono::Local;
fn log_with_timestamp(message: &str) {
    let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S%.6f").to_string();
    eprintln!("[{}] {}", timestamp, message);
}

log_with_timestamp("detect change");
changes_clone.lock().unwrap().insert((change, path)); // 178
log_with_timestamp("add changes log");
log_with_timestamp("before get changes (base)");
let py_changes = slf // 331
    .borrow()
    .changes
    .lock()
    .unwrap()
    .to_owned()
    .into_pyobject(py)?
    .into_any()
    .unbind();
log_with_timestamp("after get changes (base)");
log_with_timestamp("before clear (base)");
slf.borrow().clear(); // 340
log_with_timestamp("after clear (base)");

Logs Observed

When running the application in a test environment, the following logs indicate the problem.

[2024-12-01 18:55:28.420324] after get changes (base)
[2024-12-01 18:55:28.420329] add changes log
[2024-12-01 18:55:28.420405] detect change
[2024-12-01 18:55:28.420432] add changes log
[2024-12-01 18:55:28.420372] before clear (base)


[2024-12-01 18:55:45.140948] after get changes (base)
[2024-12-01 18:55:45.140972] detect change
[2024-12-01 18:55:45.141018] add changes log
[2024-12-01 18:55:45.141045] detect change
[2024-12-01 18:55:45.141072] add changes log
[2024-12-01 18:55:45.140976] before clear (base)

These logs demonstrate that:

  1. Data is retrieved from changes (to be passed to Python).
  2. New events are added to changes.
  3. changes is cleared.

This order of operations leads to data loss as new events are not included in the cleared data.

File Creation Script
# create_file.py
import argparse
import os
import shutil
import time
from pathlib import Path


def create_file(create_time_s: int):
    # Create a file
    basepath = Path(__file__).parent
    output_path = basepath / 'output'
    shutil.rmtree(output_path, ignore_errors=True)

    output_path.mkdir(exist_ok=True)
    end_time = time.time() + create_time_s
    cnt = 0
    while time.time() < end_time:
        file_path = output_path / f'{cnt}.txt'
        with open(file_path, 'w') as f:
            f.write('Hello, World!')
            f.flush()
        cnt += 1
        os.remove(file_path)
        # time.sleep(0.001)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Create files')
    parser.add_argument('-t', '--time', type=int, default=1, help='Create time in seconds')
    args = parser.parse_args()
    create_file(args.time)
Script Using watchfiles to Monitor Changes
# check_dir.py
import shutil
from pathlib import Path

from watchfiles import watch


def check_dir():
    basepath = Path(__file__).parent
    check_path = basepath / 'output'
    shutil.rmtree(check_path, ignore_errors=True)

    check_path.mkdir(exist_ok=True)
    output_file = basepath / 'detect_output_files.txt'
    output_file.unlink(missing_ok=True)
    with open(output_file, 'a') as f:
        for changes in watch(str(check_path), debounce=100):
            for change in changes:
                f.write(f'{change[1]}\n')


if __name__ == '__main__':
    check_dir()
Subprocess Script to Run Logging
# subprocess.py
import subprocess
from pathlib import Path

basepath = Path(__file__).parent
log_path = basepath / 'watch.log'


def run_subprocess():
    # Run a subprocess
    run_path = basepath / 'check_dir.py'
    with open(log_path, 'w') as f:
        p = subprocess.Popen(['python', str(run_path)], stdout=f, stderr=subprocess.STDOUT)
        print(f'subprocess pid: {p.pid}')
        p.wait()


if __name__ == '__main__':
    run_subprocess()
Script to Extract Specific Log Entries
import os
import re
from pathlib import Path

read_files = ['watch_base.log', 'watch_fix.log']

basepath = Path(__file__).parent


def extract_between_patterns(file_path):
    get_changes_pattern = r'\[.*\] after get changes \(.*\)'
    clear_pattern = r'\[.*\] before clear \(.*\)'

    result = []

    is_write = False

    output_file_prefix = 'result_'
    filename = os.path.basename(file_path)
    output_file = os.path.dirname(file_path) + '/' + output_file_prefix + filename
    with open(file_path, 'r') as file:
        for line in file:
            if re.search(get_changes_pattern, line):
                is_write = True
            if re.search(clear_pattern, line):
                is_write = False
                if len(result) <= 2:
                    result = []
                    continue
                with open(output_file, 'a') as f:
                    for r in result:
                        f.write(r)
                    f.write(line)
                    f.write('\n')
                    result = []
            if is_write:
                result.append(line)


for file in read_files:
    extract_between_patterns(basepath / file)

Fixed Implementation

The fix ensures that data retrieval and clearing are performed under the same lock:

let py_changes = {
    let borrowed = slf.borrow();
    let mut locked_changes = borrowed.changes.lock().unwrap();
    let py_changes = locked_changes.to_owned().into_pyobject(py)?.into_any().unbind();
    // Clear the changes while holding the lock
    locked_changes.clear();
    py_changes
};
Ok(py_changes)

This approach eliminates the data loss issue by ensuring no other thread can modify changes during the operation.

@samuelcolvin samuelcolvin merged commit fa1c853 into samuelcolvin:main Jan 10, 2025
45 of 46 checks passed
@samuelcolvin
Copy link
Owner

LGTM, thanks so much.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants