diff --git a/elasticsearch_dsl/search.py b/elasticsearch_dsl/search.py index c07fe7e71..a6284ad28 100644 --- a/elasticsearch_dsl/search.py +++ b/elasticsearch_dsl/search.py @@ -328,6 +328,9 @@ def __init__(self, **kwargs): self._query_proxy = QueryProxy(self, "query") self._post_filter_proxy = QueryProxy(self, "post_filter") + self._fields = [] + self._runtime_mappings = {} + def filter(self, *args, **kwargs): return self.query(Bool(filter=[Q(*args, **kwargs)])) @@ -411,6 +414,9 @@ def _clone(self): s._highlight_opts = self._highlight_opts.copy() s._suggest = self._suggest.copy() s._script_fields = self._script_fields.copy() + s._fields = self._fields + s._runtime_mappings = self._runtime_mappings.copy() + for x in ("query", "post_filter"): getattr(s, x)._proxied = getattr(self, x)._proxied @@ -459,6 +465,8 @@ def update_from_dict(self, d): s.setdefault("text", text) if "script_fields" in d: self._script_fields = d.pop("script_fields") + if "runtime_mappings" in d: + self._runtime_mappings = d.pop("runtime_mappings") self._extra.update(d) return self @@ -490,6 +498,48 @@ def script_fields(self, **kwargs): s._script_fields.update(kwargs) return s + def runtime_mappings(self, **kwargs): + """ + Define runtime fields to be calculated at query time. See + https://www.elastic.co/guide/en/elasticsearch/reference/current/runtime.html + for more details. + + Runtime fields are automatically added to the query response. + + Example:: + + s = Search() + s = s.runtime_mappings( + 'client_ip': { + 'type': 'ip', + 'script': ''' + String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip; + if (clientip != null) emit(clientip); + ''' + } + ) + + """ + s = self._clone() + s._runtime_mappings.update(kwargs) + s.fields(*s._runtime_mappings.keys()) + return s + + def fields(self, *args): + """ + Runtime fields are not indexed or stored, so they will not appear in the _source block if you run a query, but + can easily be added to the response by adding the 'fields' clause to the body of the query. + + Example:: + + s = Search() + s = s.fields("client_ip") + + """ + s = self._clone() + s._fields.extend(args) + return s + def source(self, fields=None, **kwargs): """ Selectively control how the _source field is returned. @@ -678,6 +728,12 @@ def to_dict(self, count=False, **kwargs): if self._script_fields: d["script_fields"] = self._script_fields + if self._runtime_mappings: + d["runtime_mappings"] = self._runtime_mappings + + if self._fields: + d["fields"] = self._fields + d.update(recursive_to_dict(kwargs)) return d diff --git a/tests/test_integration/test_search.py b/tests/test_integration/test_search.py index e3ce061eb..0679f1375 100644 --- a/tests/test_integration/test_search.py +++ b/tests/test_integration/test_search.py @@ -16,6 +16,8 @@ # under the License. +from datetime import datetime + from elasticsearch import TransportError from pytest import raises @@ -167,3 +169,35 @@ def test_raw_subfield_can_be_used_in_aggs(data_client): authors = r.aggregations.authors assert 1 == len(authors) assert {"key": "Honza Král", "doc_count": 52} == authors[0] + + +def test_runtime_field(data_client): + current_date = datetime.now() + + s = Search(index="git").filter(Q("exists", field="committed_date")) + s = s.runtime_mappings( + days_since_commit={ + "type": "long", + "script": { + "source": """ + String currentDateStr = params.get('current_date'); + + DateTimeFormatter dtf = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + LocalDateTime committedDate = doc['committed_date'].value.toLocalDateTime(); + LocalDateTime currentDate = LocalDateTime.parse(currentDateStr, dtf); + + emit(Duration.between(committedDate, currentDate).toDays()); + """, + "params": { + "current_date": current_date.replace(microsecond=0).isoformat() + }, + }, + } + ) + response = s.execute() + + for commit in response.hits: + assert "days_since_commit" in commit + + committed_date = datetime.fromisoformat(commit.committed_date) + assert commit.days_since_commit[0] == (current_date - committed_date).days diff --git a/tests/test_search.py b/tests/test_search.py index 4da824182..7144e93a6 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -307,6 +307,12 @@ def test_complex_example(): .filter(Q("term", category="meetup") | Q("term", category="conference")) .post_filter("terms", tags=["prague", "czech"]) .script_fields(more_attendees="doc['attendees'].value + 42") + .runtime_mappings( + http={ + "type": "composite", + "script": 'emit(grok("%{COMMONAPACHELOG}").extract(doc["message"].value))', + } + ) ) s.aggs.bucket("per_country", "terms", field="country").metric( @@ -342,11 +348,18 @@ def test_complex_example(): "aggs": {"avg_attendees": {"avg": {"field": "attendees"}}}, } }, + "fields": ["http"], "highlight": { "order": "score", "fields": {"title": {"fragment_size": 50}, "body": {"fragment_size": 50}}, }, "script_fields": {"more_attendees": {"script": "doc['attendees'].value + 42"}}, + "runtime_mappings": { + "http": { + "type": "composite", + "script": 'emit(grok("%{COMMONAPACHELOG}").extract(doc["message"].value))', + } + }, } == s.to_dict()