Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory allocation of 100016100016100016100016 bytes failed #7774

Closed
2 tasks done
financialclose opened this issue Mar 25, 2023 · 25 comments
Closed
2 tasks done

Memory allocation of 100016100016100016100016 bytes failed #7774

financialclose opened this issue Mar 25, 2023 · 25 comments
Assignees
Labels
bug Something isn't working python Related to Python Polars

Comments

@financialclose
Copy link

financialclose commented Mar 25, 2023

Polars version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest version of Polars.

Issue description

Currently lastest version of Polars I use is 0.16.15, use the following script and return error "memory allocation of 100016100016100016100016 bytes failed". My computer has 32GB of ram installed, and it was almost fully used during this process. When I saw the error message, my computer became unresponsive and I had to restart it. You can use data from this repository https://github.com/financialclose/benchmarking to replicate the 1 million rows dataset to 1 billion rows.

Reproducible example

import polars as pl
import time
import pathlib
s = time.time()

q = (
   pl.scan_csv("Input/1000MILLION-ROWS.CSV")   
  .select(["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"]).unique()
  )  

a = q.collect(streaming=True)
path: pathlib.Path = "Distinct.csv"
a.write_csv(path, separator=",")

e = time.time()
print("Polars Loading Time = {}".format(e-s))

Expected behavior

Expected results shall be a csv file having 8 columns x 99,696 rows of distinct result sets.

Installed versions

Replace this line with the output of pl.show_versions(), leave the backticks in place
@financialclose financialclose added bug Something isn't working python Related to Python Polars labels Mar 25, 2023
@ritchie46
Copy link
Member

Have you got any more insight in which operation it happened? Could you run with valgrind maybe?

@financialclose
Copy link
Author

I use Go for Windows as my development environment, so debugging with Rust and valgrind might be hard. When I process the same file with Go streaming function, the memory usage is only 50%. For this distinct function, I don’t need Apache arrow because my algorithm converts the input stream to the output stream directly with the assistance of Go map function so less memory is required. Bing chat also discussed this case and here are some excerpts:-

The program has a bug or a memory leak that causes it to request more memory than it needs or release memory incorrectly](https://diamondlobby.com/tech-guides/how-to-allocate-more-ram-to-rust/)[2](https://diamondlobby.com/tech-guides/how-to-allocate-more-ram-to-rust/) .
The amount of memory that the program allocates on the heap is too large.

@ritchie46
Copy link
Member

The amount of memory that the program allocates on the heap is too large.

That I understand, but I need to know where this happens.

@financialclose
Copy link
Author

financialclose commented Mar 26, 2023

Polars’ high performance inspires me to explore how to speed up data processing of csv file. I prepare to compare Go and Rust if Polars can handle billion-rows with multiple dimensions for these big search commands: Distinct, Groupby, Filter, Sorting and JoinTable. Except filter, all other commands I will select all records and many columns. During past 3 years retirement, I spent about 1/3 of time to reseach in fast/big dataframe using C# and then Go. https://www.linkedin.com/in/max01/ is my linkedin profile, but your connection does not accept any new invitation.

@ghuls
Copy link
Collaborator

ghuls commented Mar 26, 2023

Without streaming, the whole code takes around 90 seconds for the 1 bilion dataset:

Polars Loading Time = 90.52089810371399

With streaming it never finishes (running for more that 8 CPU hours (1 hour wall time)), but it didn't crash yet (memory usage is constant at 113G).

@ritchie46
Copy link
Member

Could you try a run with oveflow checks @ghuls? And maybe a run in valgrind?

[profile.release]
overflow-checks = true  

@ghuls
Copy link
Collaborator

ghuls commented Mar 26, 2023

Could you try a run with oveflow checks @ghuls? And maybe a run in valgrind?

[profile.release]
overflow-checks = true  

I tried (1.5 CPU hours), but so far no overflow has happened (if those are printed to the screen).

@ritchie46
Copy link
Member

Thanks. I shall take a look.

@ghuls
Copy link
Collaborator

ghuls commented Mar 26, 2023

With 100 milion rows it still seems to finish eventually:, but without streaming it is really fast:

In [5]: q = (
   ...:    pl.scan_csv("1000MILLION-ROWS.csv")
   ...:    .head(10_000_000)
   ...:   .select(["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"]).unique()
   ...:   )

In [6]: a = q.collect(streaming=True)
   ...: path: pathlib.Path = "Distinct.csv"
   ...: a.write_csv(path, separator=",")
   ...: 
   ...: e = time.time()
   ...: print("Polars Loading Time = {}".format(e-s))
Polars Loading Time = 116.27505898475647

In [7]: q = (
   ...:    pl.scan_csv("1000MILLION-ROWS.csv")
   ...:    .head(100_000_000)
   ...:   .select(["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"]).unique()
   ...:   )

In [8]: a = q.collect(streaming=True)
   ...: path: pathlib.Path = "Distinct.csv"
   ...: a.write_csv(path, separator=",")
   ...: 
   ...: e = time.time()
   ...: print("Polars Loading Time = {}".format(e-s))
Polars Loading Time = 883.669862985611

In [9]: s = time.time()
   ...: a = q.collect(streaming=False)
   ...: path: pathlib.Path = "Distinct.csv"
   ...: a.write_csv(path, separator=",")
   ...: 
   ...: e = time.time()
   ...: print("Polars Loading Time = {}".format(e-s))
Polars Loading Time = 9.112022876739502

@financialclose
Copy link
Author

financialclose commented Mar 27, 2023

I have only 32GB installed RAM, if not using streaming=False for 1,000 Million Rows, my machine return error "memory allocation of 624658120 bytes failed" within few seconds. How much your installed RAM @ghuls? Your Polars Loading Time = 9.112022876739502 is really fast for 100 Million Rows(my machine Polars Loading Time = 11.834631443023682), I want to know your hardware spec so I can reproduce using similar hardware spec @ghuls.

After using @ghuls setting "pl.scan_csv("1000MILLION-ROWS.csv") .head(100_000_000)", now streaming can completed process without error(Polars Loading Time = 353.3685688972473). But using streaming, the 353s is not good for my benchmarking Go and Rust. There is room for improvement. My Go design implements two separate gorouting processes and run in parallel - Read File and Distinct, this is a non-blocking design. Both processes are also running in multi-threads. Each process has its timer. The Read File timer monitors the Distinct process, while the Distinct timer monitors the read file process. This way, the Read File process knows when to take a rest to avoid excessive data loading into memory based on a max. default value. The distinct timer can delete data when relevant stream is utilized. If no new stream is ready, it takes a rest e.g. 0.1s sleep.

@financialclose
Copy link
Author

If using the below setting ".head(100_000)", Polars Loading Time = 0.0312497615814209, it is not reasonable, only 19,544 output distinct rows, the correct number should be 99,696. So I guest .head(100_000) is not a batch of streaming, it is only to read first 100_000 rows.

q = (
pl.scan_csv("Input/1000MILLION-ROWS.CSV")
.head(100_000)
.select(["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"]).unique()
)

a = q.collect(streaming=True)
path: pathlib.Path = "Distinct.csv"
a.write_csv(path, separator=",")

@ritchie46 ritchie46 self-assigned this Mar 27, 2023
@ritchie46
Copy link
Member

ritchie46 commented Mar 27, 2023

Ok, found something.

Somehow the unique is not executed int he streaming engine. Will fix that in a later PR
This is because maintain_order defaults to True. Will have a breaking change for this.

The streaming engine produces many chunks. In the 100M case: 16013. Our unique implementation seems to be O(n^2) on the number of chunks.

@ritchie46
Copy link
Member

@ghuls the performance issue should be resolved. Could you try another run?

@ghuls
Copy link
Collaborator

ghuls commented Mar 27, 2023

@ghuls the performance issue should be resolved. Could you try another run?

It doesn't look solved yet (was still compiled with overflow-checks = true ) as it still runs for a long time (killed it after 16 minutes, but it uses quite little CPU (most of the time less than 100%CPU for all 8 threads) while before it was using 800% CPU.

I will try one more time without the overflow-checks = true .

@ghuls
Copy link
Collaborator

ghuls commented Mar 27, 2023

Without overflow-checks = true it managed to finish in 17.5 minutes (so probably the run with overflow check was relatively close to finishing).

Without streaming it took 76 seconds.

@ritchie46
Copy link
Member

Hmm.. that's strange. I will look once more.

@financialclose
Copy link
Author

financialclose commented Mar 28, 2023

@ghuls, if you use groupby (in addition to distinct 8-column) to aggregate sum for Quantity, Original Amount and Base Amount to test for 1MillionRows and 1000MillionRows, you can validate whether the results of 1000MillionRows is correct for your streaming and not streaming methods.

Over 1 Million rows, if using distinct, the results are same. If using groupby, the results are not the same. The aggregate sum of 1000MillionRows is 1000X time of 1MillionRows.

Not knowing whether Polars can process DNA/RNA string, e.g. count ATCG, count particular ATC, CTA combination which match with condon table, as I feel Bioconductors is not user-friendly, it spend me most of time on installation of particular module, there are over thousands of packages. When I use Colab to run Bioconductor, it is very time consuming to install its packages.

@ritchie46
Copy link
Member

I don't really understand. I can run the 100M file streaming in 6s and non-streaming in 5s.

The difference should be that the streaming creates more chunks, so reading and rechunking is a bit more expensive than in the non-streaming case, which creates n_threads chunks.

The unique is not streaming in both queries, so we rechunk the file in the unique call.

@financialclose
Copy link
Author

financialclose commented Mar 28, 2023

My experience of using Go stream engine and found it to be flexible. For 1000M case, it can be divided by 72 sets of streams. Each stream can be divided by 100 partitions run in each thread. If I increase the number of streams, the overhead will become significant. If I decrease the number of streams, it may not fit the current memory size. Based on 72/100 setting, the processing time is 104s ~ 126s, all depend on whether there are Windows uncontrollable services running in the background. Your test time 6s is very fast for 100M, so need to run in a same computer for comparision of time.

@financialclose
Copy link
Author

financialclose commented Mar 28, 2023

For Rust 100M file without streaming, my tested time is 10.5s, for Go using streaming is 10.6s. So I can say that no performance differences using 100M file. But I guess you have a better hardware. The bottleneck of Go is its map function. It does not allow ingesting bytearray directly to get return key/value. Instead, it must be converted to a long string.

@ghuls
Copy link
Collaborator

ghuls commented Mar 28, 2023

@ritchie46 I retested. Probably there was some file system issue on the server, now it seems to finish in a reasonable time.

In [76]: q = (                                                                                                                                                                                                                                                                                                               
    ...:    pl.scan_csv("1000MILLION-ROWS.csv")                                                                                                                                                                                                                         
    ...:    .head(400_000_000)                                                                                                                                                                                                                                                                                               
    ...:   .select(["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"]).unique()                                                                                                                                                                                                              
    ...:   )                                                                                                                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                                                                                             
In [77]: s = time.time()                                                                                                                                                                                                                                                                                                     
    ...: a = q.collect(streaming=False)                                                                                                                                                                                                                                                                                      
    ...: path: pathlib.Path = "Distinct3.csv"                                                                                                                                                                                                                                                                                
    ...: a.write_csv(path, separator=",")                                                                                                                                                                                                                                                                                    
t = time.time()                                                                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                                                             
In [78]: t = time.time()                                                                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                                                                             
In [79]: t -s                                                                                                                                                                                                                                                                                                                
Out[79]: 148.0989420413971                                                                                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                                                                             
In [80]: s = time.time()                                                                                                                                                                                                                                                                                                     
    ...: a = q.collect(streaming=False)                                                                                                                                                                                                                                                                                      
    ...: path: pathlib.Path = "Distinct3.csv"                                                                                                                                                                                                                                                                                
    ...: a.write_csv(path, separator=",")                                                                                                                                                                                                                                                                                    
t = time.time()                                                                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                                                             
In [81]: t = time.time()                                                                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                                                                             
In [82]: t -s                                                                                                                                                                                                                                                                                                                
Out[82]: 37.7521448135376                                                                                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                                                                             
In [83]: s = time.time()                                                                                                                                                                                                                                                                                                     
    ...: a = q.collect(streaming=True)                                                                                                                                                                                                                                                                                       
    ...: path: pathlib.Path = "Distinct2.csv"                                                                                                                                                                                                                                                                                
    ...: a.write_csv(path, separator=",")                                                                                                                                                                                                                                                                                    
t = time.time()                                                                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                                                             
In [84]: t = time.time()                                                                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                                                                             
In [85]: t -s                                                                                                                                                                                                                                                                                                                
Out[85]: 45.83093190193176

In [86]: q = (
    ...:    pl.scan_csv("1000MILLION-ROWS.csv")
    ...:    .head(800_000_000)
    ...:   .select(["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"]).unique()
    ...:   )

In [87]: s = time.time()
    ...: a = q.collect(streaming=False)
    ...: path: pathlib.Path = "Distinct3.csv"
    ...: a.write_csv(path, separator=",")
t = time.time()

In [88]: t = time.time()

In [89]: t -s
Out[89]: 208.60496068000793

In [90]: s = time.time()
    ...: a = q.collect(streaming=False)
    ...: path: pathlib.Path = "Distinct3.csv"
    ...: a.write_csv(path, separator=",")
t = time.time()

In [91]: t = time.time()

In [92]: t -s
Out[92]: 83.62902665138245

In [93]: s = time.time()
    ...: a = q.collect(streaming=True)
    ...: path: pathlib.Path = "Distinct2.csv"
    ...: a.write_csv(path, separator=",")
t = time.time()

In [94]: t = time.time()

In [95]: t -s
Out[95]: 92.26304984092712

In [96]: q = (
    ...:    pl.scan_csv("1000MILLION-ROWS.csv")
    ...:    #.head(800_000_000)
    ...:   .select(["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"]).unique()
    ...:   )

In [97]: s = time.time()
    ...: a = q.collect(streaming=False)
    ...: path: pathlib.Path = "Distinct3.csv"
    ...: a.write_csv(path, separator=",")
t = time.time()

In [98]: t = time.time()

In [99]: t -s
Out[99]: 148.59188532829285

In [100]: s = time.time()
     ...: a = q.collect(streaming=True)
     ...: path: pathlib.Path = "Distinct2.csv"
     ...: a.write_csv(path, separator=",")
t = time.time()

In [101]: t = time.time()

In [102]: t -s
Out[102]: 235.2980933189392

@financialclose
Copy link
Author

financialclose commented Mar 28, 2023

If the streaming issue can be fixed properly, I will test JoinTable with 1 billion rows using Ledger, Account, and Project as a composite-key.

@financialclose
Copy link
Author

I use polars 0.16.17 to test it again using streaming, but return error "memory allocation of 50008 bytes failed"

import polars as pl
import time
import pathlib
s = time.time()

q = (
pl.scan_csv("Input/1000MILLION-ROWS.CSV")
.select(["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"]).unique()
)

a = q.collect(streaming=True)
path: pathlib.Path = "Distinct.csv"
a.write_csv(path, separator=",")

e = time.time()
print("Polars Loading Time = {}".format(e-s))

if not using streaming, return error "memory allocation of 624658120memory allocation of bytes failed"
624658120

@ritchie46
Copy link
Member

Yes, you go out of memory. We are still working on out-of-core

@financialclose
Copy link
Author

Using current vesion 0.17, both distinct and groupby can support 1 Billion rows. GroupBy I use this script
df = pl.scan_csv("Input/1000MILLION-ROWS.CSV")

q = (
pl.scan_csv("Input/1000MILLION-ROWS.CSV")
.groupby(by=["Ledger", "Account", "PartNo", "Contact","Project","Unit Code", "D/C","Currency"])
.agg([
pl.count('Quantity').alias('Quantity(Count)'),
pl.max('Quantity').alias('Quantity(Max)'),
pl.min('Quantity').alias('Quantity(Min)'),
pl.sum('Quantity').alias('Quantity(Sum)'),
pl.sum('Original Amount').alias('Original Amount(Sum)'),
pl.sum('Base Amount').alias('Base Amount(Sum)')]
))

a = q.collect(streaming=True)
path: pathlib.Path = "GroupBy.csv"
a.write_csv(path, separator=",")

Processing time: (32GB RAM & 8 Cores CPU)
Distinct: 160s
GroupBy: 203s

But fail in JointTable, "624658120624658120624658120624658120 memory allocation of bytes failed",
It may be due to the following setting is not appropriate. Data file: https://github.com/financialclose/benchmarking
If use 100 Million Rows only, it can complete by 85.4s with generated output.

Script for Jointable

transaction = pl.read_csv("Input/1000MILLION-ROWS.CSV")
master = pl.read_csv("Input/MASTER.CSV")
joined_table = transaction.join(master, on=["Ledger","Account","Project"], how="inner")
joined_table.write_csv("joined_table.csv")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

3 participants