This repository was archived by the owner on Apr 11, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathexternal_search.py
3263 lines (2753 loc) · 123 KB
/
external_search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from collections import defaultdict
import contextlib
import datetime
from nose.tools import set_trace
import json
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk as elasticsearch_bulk
from elasticsearch.exceptions import (
RequestError,
ElasticsearchException,
)
from elasticsearch_dsl import (
Index,
MultiSearch,
Search,
SF,
)
from elasticsearch_dsl.query import (
Bool,
DisMax,
Exists,
FunctionScore,
Match,
MatchAll,
MatchNone,
MatchPhrase,
MultiMatch,
Nested,
Query as BaseQuery,
SimpleQueryString,
Term,
Terms,
)
from spellchecker import SpellChecker
from flask_babel import lazy_gettext as _
from config import (
Configuration,
CannotLoadConfiguration,
)
from classifier import (
KeywordBasedClassifier,
GradeLevelClassifier,
AgeClassifier,
Classifier,
)
from facets import FacetConstants
from metadata_layer import IdentifierData
from model import (
numericrange_to_tuple,
Collection,
Contributor,
ConfigurationSetting,
DataSource,
Edition,
ExternalIntegration,
Identifier,
Library,
Work,
WorkCoverageRecord,
)
from lane import Pagination
from monitor import WorkSweepMonitor
from coverage import (
CoverageFailure,
WorkPresentationProvider,
)
from problem_details import INVALID_INPUT
from selftest import (
HasSelfTests,
SelfTestResult,
)
from util.personal_names import display_name_to_sort_name
from util.problem_detail import ProblemDetail
from util.stopwords import ENGLISH_STOPWORDS
import os
import logging
import re
import time
@contextlib.contextmanager
def mock_search_index(mock=None):
"""Temporarily mock the ExternalSearchIndex implementation
returned by the load() class method.
"""
try:
ExternalSearchIndex.MOCK_IMPLEMENTATION = mock
yield mock
finally:
ExternalSearchIndex.MOCK_IMPLEMENTATION = None
class ExternalSearchIndex(HasSelfTests):
NAME = ExternalIntegration.ELASTICSEARCH
# A test may temporarily set this to a mock of this class.
# While that's true, load() will return the mock instead of
# instantiating new ExternalSearchIndex objects.
MOCK_IMPLEMENTATION = None
WORKS_INDEX_PREFIX_KEY = u'works_index_prefix'
DEFAULT_WORKS_INDEX_PREFIX = u'circulation-works'
TEST_SEARCH_TERM_KEY = u'test_search_term'
DEFAULT_TEST_SEARCH_TERM = u'test'
work_document_type = 'work-type'
__client = None
CURRENT_ALIAS_SUFFIX = 'current'
VERSION_RE = re.compile('-v([0-9]+)$')
SETTINGS = [
{ "key": ExternalIntegration.URL, "label": _("URL"), "required": True, "format": "url" },
{ "key": WORKS_INDEX_PREFIX_KEY, "label": _("Index prefix"),
"default": DEFAULT_WORKS_INDEX_PREFIX,
"required": True,
"description": _("Any Elasticsearch indexes needed for this application will be created with this unique prefix. In most cases, the default will work fine. You may need to change this if you have multiple application servers using a single Elasticsearch server.")
},
{ "key": TEST_SEARCH_TERM_KEY,
"label": _("Test search term"),
"default": DEFAULT_TEST_SEARCH_TERM,
"description": _("Self tests will use this value as the search term.")
}
]
SITEWIDE = True
@classmethod
def reset(cls):
"""Resets the __client object to None so a new configuration
can be applied during object initialization.
This method is only intended for use in testing.
"""
cls.__client = None
@classmethod
def search_integration(cls, _db):
"""Look up the ExternalIntegration for ElasticSearch."""
return ExternalIntegration.lookup(
_db, ExternalIntegration.ELASTICSEARCH,
goal=ExternalIntegration.SEARCH_GOAL
)
@classmethod
def works_prefixed(cls, _db, value):
"""Prefix the given value with the prefix to use when generating index
and alias names.
:return: A string "{prefix}-{value}", or None if no prefix is configured.
"""
integration = cls.search_integration(_db)
if not integration:
return None
setting = integration.setting(cls.WORKS_INDEX_PREFIX_KEY)
prefix = setting.value_or_default(cls.DEFAULT_WORKS_INDEX_PREFIX)
return prefix + '-' + value
@classmethod
def works_index_name(cls, _db):
"""Look up the name of the search index.
It's possible, but unlikely, that the search index alias will
point to some other index. But if there were no indexes, and a
new one needed to be created, this would be the name of that
index.
"""
return cls.works_prefixed(_db, CurrentMapping.version_name())
@classmethod
def works_alias_name(cls, _db):
"""Look up the name of the search index alias."""
return cls.works_prefixed(_db, cls.CURRENT_ALIAS_SUFFIX)
@classmethod
def load(cls, _db, *args, **kwargs):
"""Load a generic implementation."""
if cls.MOCK_IMPLEMENTATION:
return cls.MOCK_IMPLEMENTATION
return cls(_db, *args, **kwargs)
def __init__(self, _db, url=None, works_index=None, test_search_term=None,
in_testing=False, mapping=None):
"""Constructor
:param in_testing: Set this to true if you don't want an
Elasticsearch client to be created, e.g. because you're
running a unit test of the constructor.
:param mapping: A custom Mapping object, for use in unit tests. By
default, the most recent mapping will be instantiated.
"""
self.log = logging.getLogger("External search index")
self.works_index = None
self.works_alias = None
integration = None
self.mapping = mapping or CurrentMapping()
if isinstance(url, ExternalIntegration):
# This is how the self-test initializes this object.
integration = url
url = integration.url
if not _db:
raise CannotLoadConfiguration(
"Cannot load Elasticsearch configuration without a database.",
)
if not url or not works_index:
integration = self.search_integration(_db)
if not integration:
raise CannotLoadConfiguration(
"No Elasticsearch integration configured."
)
url = url or integration.url
if not works_index:
works_index = self.works_index_name(_db)
test_search_term = integration.setting(
self.TEST_SEARCH_TERM_KEY).value
if not url:
raise CannotLoadConfiguration(
"No URL configured to Elasticsearch server."
)
self.test_search_term = (
test_search_term or self.DEFAULT_TEST_SEARCH_TERM
)
if not in_testing:
if not ExternalSearchIndex.__client:
use_ssl = url.startswith('https://')
self.log.info(
"Connecting to index %s in Elasticsearch cluster at %s",
works_index, url
)
ExternalSearchIndex.__client = Elasticsearch(
url, use_ssl=use_ssl, timeout=20, maxsize=25
)
self.indices = self.__client.indices
self.index = self.__client.index
self.delete = self.__client.delete
self.exists = self.__client.exists
self.put_script = self.__client.put_script
# Sets self.works_index and self.works_alias values.
# Document upload runs against the works_index.
# Search queries run against works_alias.
if works_index and integration and not in_testing:
try:
self.set_works_index_and_alias(_db)
except RequestError, e:
# This is almost certainly a problem with our code,
# not a communications error.
raise e
except ElasticsearchException, e:
raise CannotLoadConfiguration(
"Exception communicating with Elasticsearch server: %s" %
repr(e)
)
self.search = Search(using=self.__client, index=self.works_alias)
def bulk(docs, **kwargs):
return elasticsearch_bulk(self.__client, docs, **kwargs)
self.bulk = bulk
def set_works_index_and_alias(self, _db):
"""Finds or creates the works_index and works_alias based on
the current configuration.
"""
# The index name to use is the one known to be right for this
# version.
self.works_index = self.__client.works_index = self.works_index_name(_db)
if not self.indices.exists(self.works_index):
# That index doesn't actually exist. Set it up.
self.setup_index()
# Make sure the alias points to the most recent index.
self.setup_current_alias(_db)
# Make sure the stored scripts for the latest mapping exist.
self.set_stored_scripts()
def setup_current_alias(self, _db):
"""Finds or creates the works_alias as named by the current site
settings.
If the resulting alias exists and is affixed to a different
index or if it can't be generated for any reason, the alias will
not be created or moved. Instead, the search client will use the
the works_index directly for search queries.
"""
alias_name = self.works_alias_name(_db)
alias_is_set = self.indices.exists_alias(name=alias_name)
def _use_as_works_alias(name):
self.works_alias = self.__client.works_alias = name
if alias_is_set:
# The alias exists on the Elasticsearch server, so it must
# point _somewhere.
exists_on_works_index = self.indices.exists_alias(
index=self.works_index, name=alias_name
)
if exists_on_works_index:
# It points to the index we were expecting it to point to.
# Use it.
_use_as_works_alias(alias_name)
else:
# The alias exists but it points somewhere we didn't
# expect. Rather than changing how the alias works and
# then using the alias, use the index directly instead
# of going through the alias.
_use_as_works_alias(self.works_index)
return
# Create the alias and search against it.
response = self.indices.put_alias(
index=self.works_index, name=alias_name
)
if not response.get('acknowledged'):
self.log.error("Alias '%s' could not be created", alias_name)
# Work against the index instead of an alias.
_use_as_works_alias(self.works_index)
return
_use_as_works_alias(alias_name)
def setup_index(self, new_index=None, **index_settings):
"""Create the search index with appropriate mapping.
This will destroy the search index, and all works will need
to be indexed again. In production, don't use this on an
existing index. Use it to create a new index, then change the
alias to point to the new index.
"""
index_name = new_index or self.works_index
if self.indices.exists(index_name):
self.log.info("Deleting index %s", index_name)
self.indices.delete(index_name)
self.log.info("Creating index %s", index_name)
body = self.mapping.body()
body.setdefault('settings', {}).update(index_settings)
index = self.indices.create(index=index_name, body=body)
def set_stored_scripts(self):
for name, definition in self.mapping.stored_scripts():
# Make sure the name of the script is scoped and versioned.
if not name.startswith("simplified."):
name = self.mapping.script_name(name)
# If only the source code was provided, configure it as a
# Painless script.
if isinstance(definition, basestring):
definition = dict(script=dict(lang="painless", source=definition))
# Put it in the database.
self.put_script(name, definition)
def transfer_current_alias(self, _db, new_index):
"""Force -current alias onto a new index"""
if not self.indices.exists(index=new_index):
raise ValueError(
"Index '%s' does not exist on this client." % new_index)
current_base_name = self.base_index_name(self.works_index)
new_base_name = self.base_index_name(new_index)
if new_base_name != current_base_name:
raise ValueError(
("Index '%s' is not in series with current index '%s'. "
"Confirm the base name (without version number) of both indices"
"is the same.") % (new_index, self.works_index))
self.works_index = self.__client.works_index = new_index
alias_name = self.works_alias_name(_db)
exists = self.indices.exists_alias(name=alias_name)
if not exists:
# The alias doesn't already exist. Set it.
self.setup_current_alias(_db)
return
# We know the alias already exists. Before we set it to point
# to self.works_index, we may need to remove it from some
# other indices.
other_indices = self.indices.get_alias(name=alias_name).keys()
if self.works_index in other_indices:
# If the alias already points to the works index,
# that's fine -- we want to see if it points to any
# _other_ indices.
other_indices.remove(self.works_index)
if other_indices:
# The alias exists on one or more other indices. Remove
# the alias altogether, then put it back on the works
# index.
self.indices.delete_alias(index='_all', name=alias_name)
self.indices.put_alias(
index=self.works_index, name=alias_name
)
self.works_alias = self.__client.works_alias = alias_name
def base_index_name(self, index_or_alias):
"""Removes version or current suffix from base index name"""
current_re = re.compile(self.CURRENT_ALIAS_SUFFIX+'$')
base_works_index = re.sub(current_re, '', index_or_alias)
base_works_index = re.sub(self.VERSION_RE, '', base_works_index)
return base_works_index
def create_search_doc(self, query_string, filter, pagination,
debug):
query = Query(query_string, filter)
search = query.build(self.search, pagination)
if debug:
search = search.extra(explain=True)
if filter is not None and filter.min_score is not None:
search = search.extra(min_score=filter.min_score)
fields = None
if debug:
# Don't restrict the fields at all -- get everything.
# This makes it easy to investigate everything about the
# results we do get.
fields = ['*']
else:
# All we absolutely need is the work ID, which is a
# key into the database, plus the values of any script fields,
# which represent data not available through the database.
fields = ["work_id"]
if filter:
fields += filter.script_fields.keys()
# Change the Search object so it only retrieves the fields
# we're asking for.
if fields:
search = search.source(fields)
return search
def query_works(self, query_string, filter=None, pagination=None,
debug=False):
"""Run a search query.
This works by calling query_works_multi().
:param query_string: The string to search for.
:param filter: A Filter object, used to filter out works that
would otherwise match the query string.
:param pagination: A Pagination object, used to get a subset
of the search results.
:param debug: If this is True, debugging information will
be gathered and logged. The search query will ask
ElasticSearch for all available fields, not just the
fields known to be used by the feed generation code. This
all comes at a slight performance cost.
:return: A list of Hit objects containing information about
the search results. This will include the values of any
script fields calculated by ElasticSearch during the
search process.
"""
if isinstance(filter, Filter) and filter.match_nothing is True:
# We already know this search should match nothing. We
# don't even need to perform the search.
return []
pagination = pagination or Pagination.default()
query_data = (query_string, filter, pagination)
[result] = self.query_works_multi([query_data], debug)
return result
def query_works_multi(self, queries, debug=False):
"""Run several queries simultaneously and return the results
as a big list.
:param queries: A list of (query string, Filter, Pagination) 3-tuples,
each representing an Elasticsearch query to be run.
:yield: A sequence of lists, one per item in `queries`,
each containing the search results from that
(query string, Filter, Pagination) 3-tuple.
"""
# If the works alias is not set, all queries return empty.
#
# TODO: Maybe an unset works_alias should raise
# CannotLoadConfiguration in the constructor. Then we wouldn't
# have to worry about this.
if not self.works_alias:
for q in queries:
yield []
# Create a MultiSearch.
multi = MultiSearch(using=self.__client)
# Give it a Search object for every query definition passed in
# as part of `queries`.
for (query_string, filter, pagination) in queries:
search = self.create_search_doc(
query_string, filter=filter, pagination=pagination, debug=debug
)
function_scores = filter.scoring_functions if filter else None
if function_scores:
function_score = FunctionScore(
query=dict(match_all=dict()),
functions=function_scores,
score_mode="sum"
)
search = search.query(function_score)
multi = multi.add(search)
a = time.time()
# NOTE: This is the code that actually executes the ElasticSearch
# request.
resultset = [x for x in multi.execute()]
if debug:
b = time.time()
self.log.debug(
"Elasticsearch query %r completed in %.3fsec",
query_string, b-a
)
for results in resultset:
for i, result in enumerate(results):
self.log.debug(
'%02d "%s" (%s) work=%s score=%.3f shard=%s',
i, result.sort_title, result.sort_author, result.meta['id'],
result.meta.explanation['value'] or 0, result.meta['shard']
)
for i, results in enumerate(resultset):
# Tell the Pagination object about the page that was just
# 'loaded' so that Pagination.next_page will work.
#
# The pagination itself happened inside the Elasticsearch
# server when the query ran.
pagination.page_loaded(results)
yield results
def count_works(self, filter):
"""Instead of retrieving works that match `filter`, count the total."""
if filter is not None and filter.match_nothing is True:
# We already know that the filter should match nothing.
# We don't even need to perform the count.
return 0
qu = self.create_search_doc(
query_string=None, filter=filter, pagination=None, debug=False
)
return qu.count()
def bulk_update(self, works, retry_on_batch_failure=True):
"""Upload a batch of works to the search index at once."""
if not works:
# There's nothing to do. Don't bother making any requests
# to the search index.
return [], []
time1 = time.time()
needs_add = []
successes = []
for work in works:
needs_add.append(work)
# Add/update any works that need adding/updating.
docs = Work.to_search_documents(needs_add)
for doc in docs:
doc["_index"] = self.works_index
doc["_type"] = self.work_document_type
time2 = time.time()
success_count, errors = self.bulk(
docs,
raise_on_error=False,
raise_on_exception=False,
)
# If the entire update failed, try it one more time before
# giving up on the batch.
if len(errors) == len(docs):
if retry_on_batch_failure:
self.log.info("Elasticsearch bulk update timed out, trying again.")
return self.bulk_update(needs_add, retry_on_batch_failure=False)
else:
docs = []
time3 = time.time()
self.log.info("Created %i search documents in %.2f seconds" % (len(docs), time2 - time1))
self.log.info("Uploaded %i search documents in %.2f seconds" % (len(docs), time3 - time2))
doc_ids = [d['_id'] for d in docs]
# We weren't able to create search documents for these works, maybe
# because they don't have presentation editions yet.
def get_error_id(error):
return error.get('data', {}).get('_id', None) or error.get('index', {}).get('_id', None)
error_ids = [get_error_id(error) for error in errors]
missing_works = [
work for work in works
if work.id not in doc_ids and work.id not in error_ids
and work not in successes
]
successes.extend(
[work for work in works
if work.id in doc_ids and work.id not in error_ids]
)
failures = []
for missing in missing_works:
failures.append((work, "Work not indexed"))
for error in errors:
error_id = get_error_id(error)
work = None
works_with_error = [work for work in works if work.id == error_id]
if works_with_error:
work = works_with_error[0]
exception = error.get('exception', None)
error_message = error.get('error', None)
if not error_message:
error_message = error.get('index', {}).get('error', None)
failures.append((work, error_message))
self.log.info("Successfully indexed %i documents, failed to index %i." % (success_count, len(failures)))
return successes, failures
def remove_work(self, work):
"""Remove the search document for `work` from the search index.
"""
args = dict(index=self.works_index, doc_type=self.work_document_type,
id=work.id)
if self.exists(**args):
self.delete(**args)
def _run_self_tests(self, _db, in_testing=False):
# Helper methods for setting up the self-tests:
def _search():
return self.create_search_doc(
self.test_search_term, filter=None,
pagination=None, debug=True
)
def _works():
return self.query_works(
self.test_search_term, filter=None, pagination=None,
debug=True
)
# The self-tests:
def _search_for_term():
titles = [("%s (%s)" %(x.sort_title, x.sort_author)) for x in _works()]
return titles
yield self.run_test(
("Search results for '%s':" %(self.test_search_term)),
_search_for_term
)
def _get_raw_doc():
search = _search()
if in_testing:
if not len(search):
return str(search)
search = search[0]
return json.dumps(search.to_dict(), indent=1)
yield self.run_test(
("Search document for '%s':" %(self.test_search_term)),
_get_raw_doc
)
def _get_raw_results():
return [json.dumps(x.to_dict(), indent=1) for x in _works()]
yield self.run_test(
("Raw search results for '%s':" %(self.test_search_term)),
_get_raw_results
)
def _count_docs():
# The mock methods used in testing return a list, so we have to call len() rather than count().
if in_testing:
return str(len(self.search))
return str(self.search.count())
yield self.run_test(
("Total number of search results for '%s':" %(self.test_search_term)),
_count_docs
)
def _total_count():
return str(self.count_works(None))
yield self.run_test(
"Total number of documents in this search index:",
_total_count
)
def _collections():
result = {}
collections = _db.query(Collection)
for collection in collections:
filter = Filter(collections=[collection])
result[collection.name] = self.count_works(filter)
return json.dumps(result, indent=1)
yield self.run_test(
"Total number of documents per collection:",
_collections
)
class MappingDocument(object):
"""This class knows a lot about how the 'properties' section of an
Elasticsearch mapping document (or one of its subdocuments) is
created.
"""
def __init__(self):
self.properties = {}
self.subdocuments = {}
def add_property(self, name, type, **description):
"""Add a field to the list of properties.
:param name: Name of the field as found in search documents.
:param type: Type of the field. This may be a custom type,
so long as a hook method is defined for that type.
:param description: Description of the field.
"""
# TODO: For some fields we could set index: False here, which
# would presumably lead to a smaller index and faster
# updates. However, it might hurt performance of
# searches. When this code is more mature we can do a
# side-by-side comparison.
defaults = dict(index=True, store=False)
description['type'] = type
for default_name, default_value in defaults.items():
if default_name not in description:
description[default_name] = default_value
hook_method = getattr(self, type + "_property_hook", None)
if hook_method is not None:
hook_method(description)
# TODO: Cross-check the description for correctness. Do the
# things it mention actually exist? Better to fail now with a
# useful error than to fail when talking to Elasticsearch.
self.properties[name] = description
def add_properties(self, properties_by_type):
"""Turn a dictionary mapping types to field names into a
bunch of add_property() calls.
Useful when you have a lot of fields that don't need any
customization.
"""
for type, properties in properties_by_type.items():
for name in properties:
self.add_property(name, type)
def subdocument(self, name):
"""Create a new HasProperties object and register it as a
sub-document of this one.
"""
subdocument = MappingDocument()
self.subdocuments[name] = subdocument
return subdocument
def basic_text_property_hook(self, description):
"""Hook method to handle the custom 'basic_text'
property type.
This type does not exist in Elasticsearch. It's our name for a
text field that is indexed three times: once using our default
English analyzer ("title"), once using an analyzer with
minimal stemming ("title.minimal") for close matches, and once
using an analyzer that leaves stopwords in place, for searches
that rely on stopwords.
"""
description['type'] = 'text'
description['analyzer'] = 'en_default_text_analyzer'
description['fields'] = {
"minimal": {
"type": "text",
"analyzer": "en_minimal_text_analyzer"
},
"with_stopwords": {
"type": "text",
"analyzer": "en_with_stopwords_text_analyzer"
},
}
def filterable_text_property_hook(self, description):
"""Hook method to handle the custom 'filterable_text'
property type.
This type does not exist in Elasticsearch. It's our name for a
text field that can be used in both queries and filters.
This field is indexed _four_ times -- the three ways a normal
text field is indexed, plus again as an unparsed keyword that
can be used in filters.
"""
self.basic_text_property_hook(description)
description["fields"]["keyword"] = {
"type": "keyword",
"index": True,
"store": False,
"normalizer": "filterable_string",
}
class Mapping(MappingDocument):
"""A class that defines the mapping for a particular version of the search index.
Code that won't change between versions can go here. (Or code that
can change between versions without affecting anything.)
"""
VERSION_NAME = None
@classmethod
def version_name(cls):
"""Return the name of this Mapping subclass."""
version = cls.VERSION_NAME
if not version:
raise NotImplementedError("VERSION_NAME not defined")
if not version.startswith('v'):
version = 'v%s' % version
return version
@classmethod
def script_name(cls, base_name):
"""Scope a script name with "simplified" (to avoid confusion with
other applications on the Elasticsearch server), and the
version number (to avoid confusion with other versions *of
this application*, which may implement the same script
differently, on this Elasticsearch server).
"""
return "simplified.%s.%s" % (base_name, cls.version_name())
def __init__(self):
super(Mapping, self).__init__()
self.filters = {}
self.char_filters = {}
self.normalizers = {}
self.analyzers = {}
def create(self, search_client, base_index_name):
"""Ensure that an index exists in `search_client` for this Mapping.
:return: True or False, indicating whether the index was created new.
"""
versioned_index = base_index_name+'-'+self.version_name()
if search_client.indices.exists(index=versioned_index):
return False
else:
search_client.setup_index(new_index=versioned_index)
return True
def sort_author_keyword_property_hook(self, description):
"""Give the `sort_author` property its custom analyzer."""
description['type'] = 'text'
description['analyzer'] = 'en_sort_author_analyzer'
description['fielddata'] = True
def body(self):
"""Generate the body of the mapping document for this version of the
mapping.
"""
settings = dict(
analysis=dict(
filter=self.filters,
char_filter=self.char_filters,
normalizer=self.normalizers,
analyzer=self.analyzers
)
)
# Start with the normally defined properties.
properties = dict(self.properties)
# Add subdocuments as additional properties.
for name, subdocument in self.subdocuments.items():
properties[name] = dict(
type="nested", properties=subdocument.properties
)
mappings = {
ExternalSearchIndex.work_document_type : dict(properties=properties)
}
return dict(settings=settings, mappings=mappings)
class CurrentMapping(Mapping):
"""The first mapping to support only Elasticsearch 6.
The body of this mapping looks for bibliographic information in
the core document, primarily used for matching search
requests. It also has nested documents, which are used for
filtering and ranking Works when generating other types of
feeds:
* licensepools -- the Work has these LicensePools (includes current
availability as a boolean, but not detailed availability information)
* customlists -- the Work is on these CustomLists
* contributors -- these Contributors worked on the Work
"""
VERSION_NAME = "v4"
# Use regular expressions to normalized values in sortable fields.
# These regexes are applied in order; that way "H. G. Wells"
# becomes "H G Wells" becomes "HG Wells".
CHAR_FILTERS = {
"remove_apostrophes": dict(
type="pattern_replace", pattern="'",
replacement="",
)
}
AUTHOR_CHAR_FILTER_NAMES = []
for name, pattern, replacement in [
# The special author name "[Unknown]" should sort after everything
# else. REPLACEMENT CHARACTER is the final valid Unicode character.
("unknown_author", "\[Unknown\]", u"\N{REPLACEMENT CHARACTER}"),
# Works by a given primary author should be secondarily sorted
# by title, not by the other contributors.
("primary_author_only", "\s+;.*", ""),
# Remove parentheticals (e.g. the full name of someone who
# goes by initials).
("strip_parentheticals", "\s+\([^)]+\)", ""),
# Remove periods from consideration.
("strip_periods", "\.", ""),
# Collapse spaces for people whose sort names end with initials.
("collapse_three_initials", " ([A-Z]) ([A-Z]) ([A-Z])$", " $1$2$3"),
("collapse_two_initials", " ([A-Z]) ([A-Z])$", " $1$2"),
]:
normalizer = dict(type="pattern_replace",
pattern=pattern,
replacement=replacement)
CHAR_FILTERS[name] = normalizer
AUTHOR_CHAR_FILTER_NAMES.append(name)
def __init__(self):
super(CurrentMapping, self).__init__()
# Set up character filters.
#
self.char_filters = self.CHAR_FILTERS
# This normalizer is used on freeform strings that
# will be used as tokens in filters. This way we can,
# e.g. ignore capitalization when considering whether
# two books belong to the same series or whether two
# author names are the same.
self.normalizers['filterable_string'] = dict(
type="custom", filter=["lowercase", "asciifolding"]
)
# Set up analyzers.
#
# We use three analyzers:
#
# 1. An analyzer based on Elasticsearch's default English
# analyzer, with a normal stemmer -- used as the default
# view of a text field such as 'description'.
#
# 2. An analyzer that's exactly the same as #1 but with a less
# aggressive stemmer -- used as the 'minimal' view of a
# text field such as 'description.minimal'.
#
# 3. An analyzer that's exactly the same as #2 but with
# English stopwords left in place instead of filtered out --
# used as the 'with_stopwords' view of a text field such as
# 'title.with_stopwords'.