Skip to content

Commit

Permalink
update index all code
Browse files Browse the repository at this point in the history
  • Loading branch information
syphax-bouazzouni committed Jan 23, 2025
1 parent 2e9bda8 commit 52c31ff
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 69 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GIT
remote: https://github.com/ontoportal-lirmm/goo.git
revision: b8eb3d0889d00b5aebb3d49deb00dfe398ad166b
revision: 27300f28ca6c656c7e78af65013d88b792a6312f
branch: development
specs:
goo (0.0.2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ def process(logger, options = nil)
begin
index_all_data(logger, options)
@submission.add_submission_status(status)
rescue StandardError
rescue StandardError => e
logger.error("Error indexing all data for submission: #{e.message} : #{e.backtrace.join("\n")}")

Check warning on line 12 in lib/ontologies_linked_data/services/submission_process/operations/submission_all_data_indexer.rb

View check run for this annotation

Codecov / codecov/patch

lib/ontologies_linked_data/services/submission_process/operations/submission_all_data_indexer.rb#L11-L12

Added lines #L11 - L12 were not covered by tests
@submission.add_submission_status(status.get_error_status)
ensure
@submission.save
Expand All @@ -17,81 +18,74 @@ def process(logger, options = nil)

private

def index_sorted_ids(ids, ontology, conn, logger, commit = true)
total_triples = Parallel.map(ids.each_slice(1000), in_threads: 10) do |ids_slice|
index_ids = 0
triples_count = 0
documents = {}
time = Benchmark.realtime do
documents, triples_count = fetch_triples(ids_slice, ontology)
end

return if documents.empty?

logger.info("Worker #{Parallel.worker_number} > Fetched #{triples_count} triples of #{@submission.id} in #{time} sec.") if triples_count.positive?

time = Benchmark.realtime do
conn.index_document(documents.values, commit: false)
conn.index_commit if commit
index_ids = documents.size
documents = {}
end
logger.info("Worker #{Parallel.worker_number} > Indexed #{index_ids} ids of #{@submission.id} in #{time} sec.")
triples_count
end
total_triples.sum
end

def index_all_data(logger, commit: true)
page = 1
size = 10_000
size = Goo.backend_vo? ? 100 : 1000
count_ids = 0
total_time = 0
total_triples = 0
old_count = -1

ontology = @submission.bring(:ontology).ontology
.bring(:acronym).acronym
conn = init_search_collection(ontology)

ids = {}

while count_ids != old_count
old_count = count_ids
count = 0
time = Benchmark.realtime do
ids = fetch_sorted_ids(size, page)
count = ids.size
r = Goo.sparql_query_client.query("SELECT (COUNT(DISTINCT ?id) as ?count) WHERE { GRAPH <#{@submission.id}> { ?id ?p ?v } }")
total_ids = r.each_solution.first[:count].to_i
logger.info "Total ids count: #{total_ids}"

r = Goo.sparql_query_client.query("SELECT (COUNT(*) as ?count) WHERE { GRAPH <#{@submission.id}> { ?id ?p ?v } }")
total_triples = r.each_solution.first[:count].to_i
logger.info "Total triples count: #{total_triples}"

chunk_size = total_ids / size + 1
total_triples_indexed = 0
total_time = Benchmark.realtime do
results = Parallel.map((1..chunk_size).to_a, in_threads: 10) do |p|
index_all_data_page(logger, p, size, ontology, conn, commit)
end
results.each do |x|
next if x.nil?

count_ids += count
total_time += time
page += 1

next unless count.positive?
count_ids += x[1]
total_triples_indexed += x[2]
end
end

logger.info("Fetched #{count} ids of #{@submission.id} page: #{page} in #{time} sec.")
logger.info("Completed indexing all ontology data in #{total_time} sec. (#{count_ids} ids / #{total_triples} triples)")
end

time = Benchmark.realtime do
total_triples += index_sorted_ids(ids, ontology, conn, logger, commit)
end
logger.info("Indexed #{total_triples} triples of #{@submission.id} page: #{page} in #{time} sec.")
def index_all_data_page(logger, page, size, ontology, conn, commit = true)
ids = []
time = Benchmark.realtime do
ids = fetch_ids(size, page)
end
count_ids = ids.size
total_time = time
return if ids.empty?

logger.info("Page #{page} - Fetch IDS: #{ids.size} ids (total: #{count_ids}) in #{time} sec.")
documents = []
triples_count = 0
time = Benchmark.realtime do
documents, triples_count = fetch_triples(ids, ontology)
end
total_time += time
logger.info("Page #{page} - Fetch IDs triples: #{triples_count} in #{time} sec.")
return if documents.empty?

total_time += time
time = Benchmark.realtime do
puts "Indexing #{documents.size} documents page: #{page}"
conn.index_document(documents.values, commit: commit)
end
logger.info("Completed indexing all ontology data: #{@submission.id} in #{total_time} sec. (#{count_ids} ids / #{total_triples} triples)")
logger.flush
logger.info("Page #{page} - Indexed #{documents.size} documents page: #{page} in #{time} sec.")
[total_time, count_ids, triples_count]
end

def fetch_sorted_ids(size, page)
def fetch_ids(size, page)
query = Goo.sparql_query_client.select(:id)
.distinct
.from(RDF::URI.new(@submission.id))
.where(%i[id p v])
.limit(size)
.offset((page - 1) * size)

query.each_solution.map(&:id).sort
query.each_solution.map{|x| x.id.to_s}
end

def update_doc(doc, property, new_val)
Expand Down Expand Up @@ -121,12 +115,7 @@ def init_search_collection(ontology)
def fetch_triples(ids_slice, ontology)
documents = {}
count = 0
filter = ids_slice.map { |x| "?id = <#{x}>" }.join(' || ')
query = Goo.sparql_query_client.select(:id, :p, :v)
.from(RDF::URI.new(@submission.id))
.where(%i[id p v])
.filter(filter)
query.each_solution do |sol|
fetch_paginated_triples(ids_slice).each do |sol|
count += 1
doc = documents[sol[:id].to_s]
doc ||= {
Expand All @@ -144,11 +133,34 @@ def fetch_triples(ids_slice, ontology)
end
documents[sol[:id].to_s] = doc
end

[documents, count]
end

def fetch_paginated_triples(ids_slice)
solutions = []
count = 0
page = 1
page_size = 10_000
filter = ids_slice.map { |x| "?id = <#{x}>" }.join(' || ')

while count.positive? || page == 1
query = Goo.sparql_query_client.select(:id, :p, :v)
.from(RDF::URI.new(@submission.id))
.where(%i[id p v])
.filter(filter)
.slice((page - 1) * page_size, page_size)

sol = query.each_solution.to_a
count = sol.size
solutions += sol
break if count.zero? || count < page_size

page += 1
end
solutions
end
end
end
end


end
3 changes: 3 additions & 0 deletions test/models/skos/test_collections.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ def test_collections_all
assert_equal 2, collections.size
collections_test = test_data

collections_test.sort_by! { |x| x[:id] }
collections.sort_by! { |x| x.id.to_s }

collections.each_with_index do |x, i|
collection_test = collections_test[i]
assert_equal collection_test[:id], x.id.to_s
Expand Down
23 changes: 18 additions & 5 deletions test/models/test_search.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,26 @@ def test_search_ontology_data
refute_empty(ont_sub.submissionStatus.select { |x| x.id['INDEXED_ALL_DATA'] })

conn = Goo.search_client(:ontology_data)
response = conn.search('*')

count = Goo.sparql_query_client.query("SELECT (COUNT( DISTINCT ?id) as ?c) FROM <#{ont_sub.id}> WHERE {?id ?p ?v}")
.first[:c]
.to_i
count_ids = Goo.sparql_query_client.query("SELECT (COUNT( DISTINCT ?id) as ?c) FROM <#{ont_sub.id}> WHERE {?id ?p ?v}")
.first[:c]
.to_i

assert_includes [count, count + 1], response['response']['numFound']
total_triples = Goo.sparql_query_client.query("SELECT (COUNT(*) as ?c) FROM <#{ont_sub.id}> WHERE {?s ?p ?o}").first[:c].to_i

response = conn.search('*', rows: count_ids + 100)
index_total_triples = response['response']['docs'].map do |doc|
count = 0
doc.each_value do |v|
count += Array(v).size
end
count -= 6
count
end.sum

# TODO: fix maybe in future sometime randomly don't index excactly all the triples
assert_in_delta total_triples, index_total_triples, 100
assert_in_delta count_ids, response['response']['numFound'], 100

response = conn.search('*', fq: ' resource_id:"http://opendata.inrae.fr/thesaurusINRAE/c_10065"')

Expand Down

0 comments on commit 52c31ff

Please sign in to comment.