import logging
import copy
from functools import wraps
from collections import OrderedDict
import six
from .result import ElasticResult
log = logging.getLogger(__name__)
ARBITRARILY_LARGE_SIZE = 100000
[docs]def generative(f):
"""
A decorator to wrap query methods to make them automatically generative.
"""
@wraps(f)
def wrapped(self, *args, **kwargs):
self = self._generate()
f(self, *args, **kwargs)
return self
return wrapped
[docs]def filters(f):
"""
A convenience decorator to wrap query methods that are adding filters. To
use, simply make a method that returns a filter dict in elasticsearch's
JSON object format.
Should be used inside @generative (listed after in decorator order).
"""
@wraps(f)
def wrapped(self, *args, **kwargs):
val = f(self, *args, **kwargs)
self.filters.append(val)
return wrapped
[docs]class ElasticQuery(object):
"""
Represents a query to be issued against the ES backend.
"""
def __init__(self, client, classes=None, q=None):
if not q:
q = self.match_all_query()
elif isinstance(q, six.string_types):
q = self.text_query(q, operator='and')
self.base_query = q
self.client = client
self.classes = classes
self.filters = []
self.suggests = {}
self.sorts = OrderedDict()
self.facets = {}
self._size = None
self._start = None
def _generate(self):
s = self.__class__.__new__(self.__class__)
s.__dict__ = self.__dict__.copy()
s.filters = list(s.filters)
s.suggests = s.suggests.copy()
s.sorts = s.sorts.copy()
s.facets = s.facets.copy()
return s
@staticmethod
[docs] def match_all_query():
"""
Static method to return a filter dict which will match everything. Can
be overridden in a subclass to customize behavior.
"""
return {
'match_all': {}
}
@staticmethod
[docs] def text_query(phrase, operator="and"):
"""
Static method to return a filter dict to match a text search. Can be
overridden in a subclass to customize behavior.
"""
return {
"match": {
'_all': {
"query": phrase,
"operator": operator,
"analyzer": "content"
}
}
}
@generative
@filters
[docs] def filter_term(self, term, value):
"""
Filter for documents where the field ``term`` matches ``value``.
"""
return {'term': {term: value}}
@generative
@filters
[docs] def filter_terms(self, term, value):
"""
Filter for documents where the field ``term`` matches one of the
elements in ``value`` (which should be a sequence).
"""
return {'terms': {term: value}}
@generative
@filters
[docs] def filter_value_upper(self, term, upper):
"""
Filter for documents where term is numerically less than ``upper``.
"""
return {'range': {term: {'to': upper, 'include_upper': True}}}
@generative
@filters
[docs] def filter_value_lower(self, term, lower):
"""
Filter for documents where term is numerically more than ``lower``.
"""
return {'range': {term: {'from': lower, 'include_lower': True}}}
@generative
@filters
def filter_has_parent_term(self, parent_type, term, value):
return {
'has_parent': {
'parent_type': parent_type,
'query': {
'term': {
term: value,
}
}
}
}
@generative
[docs] def order_by(self, key, desc=False):
"""
Sort results by the field ``key``. Default to ascending order, unless
``desc`` is True.
"""
order = "desc" if desc else "asc"
self.sorts['order_by_%s' % key] = {key: {"order": order}}
@generative
[docs] def add_facet(self, facet):
"""
Add a query facet, to return data used for the implementation of
faceted search (e.g. returning result counts for given possible
sub-queries).
The facet should be supplied as a dict in the format that ES uses for
representation.
It is recommended to use the helper methods ``add_term_facet()`` or
``add_range_facet()`` where possible.
"""
self.facets.update(facet)
[docs] def add_term_facet(self, name, size, field):
"""
Add a term facet.
ES will return data about document counts for the top sub-queries (by
document count) in which the results are filtered by a given term.
"""
return self.add_facet({
name: {
'terms': {
'field': field,
'size': size
}
}
})
[docs] def add_range_facet(self, name, field, ranges):
"""
Add a range facet.
ES will return data about documetn counts for the top sub-queries (by
document count) inw hich the results are filtered by a given numerical
range.
"""
return self.add_facet({
name: {
'range': {
'field': field,
'ranges': ranges,
}
}
})
@generative
def add_term_suggester(self, name, field, text, sort='score',
suggest_mode='missing'):
self.suggests[name] = {
'text': text,
'term': {
'field': field,
'sort': sort,
'suggest_mode': suggest_mode,
}
}
@generative
[docs] def offset(self, n):
"""
When returning results, start at document ``n``.
"""
if self._start is not None:
raise ValueError('This query already has an offset applied.')
self._start = n
start = offset
@generative
[docs] def limit(self, n):
"""
When returning results, stop at document ``n``.
"""
if self._size is not None:
raise ValueError('This query already has a limit applied.')
self._size = n
size = limit
def _search(self, start=None, size=None, fields=None):
q = copy.copy(self.base_query)
if self.filters:
f = {'and': self.filters}
q = {
'filtered': {
'filter': f,
'query': q,
}
}
q_start = self._start or 0
q_size = self._size or ARBITRARILY_LARGE_SIZE
if size is not None:
q_size = max(0,
size if q_size is None else
min(size, q_size - q_start))
if start is not None:
q_start = q_start + start
body = {
'sort': list(self.sorts.values()),
'query': q
}
if self.facets:
body['facets'] = self.facets
if self.suggests:
body['suggest'] = self.suggests
return self.client.search(body, classes=self.classes, fields=fields,
size=q_size, from_=q_start)
[docs] def execute(self, start=None, size=None, fields=None):
"""
Execute this query and return a result set.
"""
return ElasticResult(self._search(start=start, size=size,
fields=fields))
[docs] def count(self):
"""
Execute this query to determine the number of documents that would be
returned, but do not actually fetch documents. Returns an int.
"""
res = self._search(size=0)
return res['hits']['total']