diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 000000000..0e40fe8f5 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ + +# Default ignored files +/workspace.xml \ No newline at end of file diff --git a/.idea/.rakeTasks b/.idea/.rakeTasks new file mode 100644 index 000000000..c6865d9a1 --- /dev/null +++ b/.idea/.rakeTasks @@ -0,0 +1,7 @@ + + diff --git a/.idea/chewy.iml b/.idea/chewy.iml new file mode 100644 index 000000000..d7699d8dd --- /dev/null +++ b/.idea/chewy.iml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 000000000..b0db9b0fc --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 000000000..e972d0c16 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 000000000..a65e67ed1 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 000000000..94a25f7f4 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/lib/chewy.rb b/lib/chewy.rb index a6239be31..d11e9f3a9 100644 --- a/lib/chewy.rb +++ b/lib/chewy.rb @@ -98,6 +98,9 @@ def derive_name(index_name) def client Chewy.current[:chewy_client] ||= begin client_configuration = configuration.deep_dup + if hosts + client_configuration[:hosts] = client_configuration[hosts] + end client_configuration.delete(:prefix) # used by Chewy, not relevant to Elasticsearch::Client block = client_configuration[:transport_options].try(:delete, :proc) ::Elasticsearch::Client.new(client_configuration, &block) @@ -119,6 +122,7 @@ def wait_for_status # Be careful, if current prefix is blank, this will destroy all the indexes. # def massacre + # TODO(Max): Add hosts to this client call Chewy.client.indices.delete(index: [Chewy.configuration[:prefix], '*'].reject(&:blank?).join('_')) Chewy.wait_for_status end diff --git a/lib/chewy/index.rb b/lib/chewy/index.rb index 4c63cd8e4..c4eb02f69 100644 --- a/lib/chewy/index.rb +++ b/lib/chewy/index.rb @@ -48,6 +48,8 @@ class Index self._default_import_options = {} class << self + attr_reader :hosts_name + # @overload index_name(suggest) # If suggested name is passed, it is set up as the new base name for # the index. Used for the index base name redefinition. @@ -92,6 +94,13 @@ def index_name(suggest = nil, prefix: nil, suffix: nil) end end + # Sets the hosts name of the index. If hosts_name is nil, use the default + # hosts in chewy.yml. Otherwise use the hosts with the specified name for + # indexing/queries. + def set_hosts_name(hosts_name) + @hosts_name = hosts_name + end + # Base name for the index. Uses the default value inferred from the # class name unless redefined. # diff --git a/lib/chewy/index/actions.rb b/lib/chewy/index/actions.rb index afc7debcc..baa3fb46b 100644 --- a/lib/chewy/index/actions.rb +++ b/lib/chewy/index/actions.rb @@ -12,7 +12,7 @@ module ClassMethods # UsersIndex.exists? #=> true # def exists? - client.indices.exists(index: index_name) + client(@hosts_name).indices.exists(index: index_name) end # Creates index and applies mappings and settings. @@ -59,7 +59,7 @@ def create!(suffix = nil, **options) body = specification_hash body[:aliases] = {general_name => {}} if options[:alias] && suffixed_name != general_name - result = client.indices.create(index: suffixed_name, body: body) + result = client(@hosts_name).indices.create(index: suffixed_name, body: body) Chewy.wait_for_status if result result @@ -164,13 +164,13 @@ def reset!(suffix = nil, apply_journal: true, journal: false, **import_options) original_index_settings suffixed_name delete if indexes.blank? - client.indices.update_aliases body: {actions: [ + client(@hosts_name).indices.update_aliases body: {actions: [ *indexes.map do |index| {remove: {index: index, alias: general_name}} end, {add: {index: suffixed_name, alias: general_name}} ]} - client.indices.delete index: indexes if indexes.present? + client(@hosts_name).indices.delete index: indexes if indexes.present? self.journal.apply(start_time, **import_options) if apply_journal result @@ -255,7 +255,7 @@ def original_index_settings(index_name) end def update_settings(index_name, **options) - client.indices.put_settings index: index_name, body: {index: options[:settings]} + client(@hosts_name).indices.put_settings index: index_name, body: {index: options[:settings]} end def index_settings(setting_name) diff --git a/lib/chewy/query.rb b/lib/chewy/query.rb new file mode 100644 index 000000000..cd22debf9 --- /dev/null +++ b/lib/chewy/query.rb @@ -0,0 +1,1142 @@ +require 'chewy/query/criteria' +require 'chewy/query/filters' +require 'chewy/query/loading' +require 'chewy/query/pagination' + +module Chewy + # Query allows you to create ES search requests with convenient + # chainable DSL. Queries are lazy evaluated and might be merged. + # The same DSL is used for whole index or individual types query build. + # + # @example + # UsersIndex.filter{ age < 42 }.query(text: {name: 'Alex'}).limit(20) + # UsersIndex::User.filter{ age < 42 }.query(text: {name: 'Alex'}).limit(20) + # + class Query + include Enumerable + include Loading + include Pagination + include Chewy::Search::Scoping + + DELEGATED_METHODS = %i[ + explain query_mode filter_mode post_filter_mode + timeout limit offset highlight min_score rescore facets script_score + boost_factor weight random_score field_value_factor decay aggregations + suggest none strategy query filter post_filter boost_mode + score_mode order reorder only types delete_all find total + total_count total_entries unlimited script_fields track_scores preference + ].to_set.freeze + + delegate :each, :count, :size, to: :_collection + alias_method :to_ary, :to_a + + attr_reader :_indexes, :_types, :options, :criteria + + def initialize(*indexes_or_types_and_options) + @options = indexes_or_types_and_options.extract_options! + @_types = indexes_or_types_and_options.select { |klass| klass < Chewy::Type } + @_indexes = indexes_or_types_and_options.select { |klass| klass < Chewy::Index } + @_indexes |= @_types.map(&:index) + @criteria = Criteria.new + end + + # A compatibility layer with the new request DSL. + def render + _request + end + + # Comparation with other query or collection + # If other is collection - search request is executed and + # result is used for comparation + # + # @example + # UsersIndex.filter(term: {name: 'Johny'}) == UsersIndex.filter(term: {name: 'Johny'}) # => true + # UsersIndex.filter(term: {name: 'Johny'}) == UsersIndex.filter(term: {name: 'Johny'}).to_a # => true + # UsersIndex.filter(term: {name: 'Johny'}) == UsersIndex.filter(term: {name: 'Winnie'}) # => false + # + def ==(other) + super || other.is_a?(self.class) ? other.criteria == criteria : other == to_a + end + + # Adds `explain` parameter to search request. + # + # @example + # UsersIndex.filter(term: {name: 'Johny'}).explain + # UsersIndex.filter(term: {name: 'Johny'}).explain(true) + # UsersIndex.filter(term: {name: 'Johny'}).explain(false) + # + # Calling explain without any arguments sets explanation flag to true. + # With `explain: true`, every result object has `_explanation` + # method + # + # @example + # UsersIndex::User.filter(term: {name: 'Johny'}).explain.first._explanation # => {...} + # + def explain(value = nil) + chain { criteria.update_request_options explain: (value.nil? ? true : value) } + end + + # Adds `script_fields` parameter to search request. + # + # @example + # UsersIndex.script_fields( + # distance: { + # params: { + # lat: 37.569976, + # lon: -122.351591 + # }, + # script: "doc['coordinates'].distanceInMiles(lat, lon)" + # } + # ) + def script_fields(value) + chain { criteria.update_script_fields(value) } + end + + # Sets query compilation mode for search request. + # Not used if only one filter for search is specified. + # Possible values: + # + # * `:must` + # Default value. Query compiles into a bool `must` query. + # + # @example + # UsersIndex.query(text: {name: 'Johny'}).query(range: {age: {lte: 42}}) + # # => {body: { + # query: {bool: {must: [{text: {name: 'Johny'}}, {range: {age: {lte: 42}}}]}} + # }} + # + # * `:should` + # Query compiles into a bool `should` query. + # + # @example + # UsersIndex.query(text: {name: 'Johny'}).query(range: {age: {lte: 42}}).query_mode(:should) + # # => {body: { + # query: {bool: {should: [{text: {name: 'Johny'}}, {range: {age: {lte: 42}}}]}} + # }} + # + # * Any acceptable `minimum_should_match` value (1, '2', '75%') + # Query compiles into a bool `should` query with `minimum_should_match` set. + # + # @example + # UsersIndex.query(text: {name: 'Johny'}).query(range: {age: {lte: 42}}).query_mode('50%') + # # => {body: { + # query: {bool: { + # should: [{text: {name: 'Johny'}}, {range: {age: {lte: 42}}}], + # minimum_should_match: '50%' + # }} + # }} + # + # * `:dis_max` + # Query compiles into a `dis_max` query. + # + # @example + # UsersIndex.query(text: {name: 'Johny'}).query(range: {age: {lte: 42}}).query_mode(:dis_max) + # # => {body: { + # query: {dis_max: {queries: [{text: {name: 'Johny'}}, {range: {age: {lte: 42}}}]}} + # }} + # + # * Any Float value (0.0, 0.7, 1.0) + # Query compiles into a `dis_max` query with `tie_breaker` option set. + # + # @example + # UsersIndex.query(text: {name: 'Johny'}).query(range: {age: {lte: 42}}).query_mode(0.7) + # # => {body: { + # query: {dis_max: { + # queries: [{text: {name: 'Johny'}}, {range: {age: {lte: 42}}}], + # tie_breaker: 0.7 + # }} + # }} + # + # Default value for `:query_mode` might be changed + # with `Chewy.query_mode` config option. + # + # @example + # Chewy.query_mode = :dis_max + # Chewy.query_mode = '50%' + # + def query_mode(value) + chain { criteria.update_options query_mode: value } + end + + # Sets query compilation mode for search request. + # Not used if only one filter for search is specified. + # Possible values: + # + # * `:and` + # Default value. Filter compiles into an `and` filter. + # + # @example + # UsersIndex.filter{ name == 'Johny' }.filter{ age <= 42 } + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {and: [{term: {name: 'Johny'}}, {range: {age: {lte: 42}}}]} + # }}}} + # + # * `:or` + # Filter compiles into an `or` filter. + # + # @example + # UsersIndex.filter{ name == 'Johny' }.filter{ age <= 42 }.filter_mode(:or) + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {or: [{term: {name: 'Johny'}}, {range: {age: {lte: 42}}}]} + # }}}} + # + # * `:must` + # Filter compiles into a bool `must` filter. + # + # @example + # UsersIndex.filter{ name == 'Johny' }.filter{ age <= 42 }.filter_mode(:must) + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {bool: {must: [{term: {name: 'Johny'}}, {range: {age: {lte: 42}}}]}} + # }}}} + # + # * `:should` + # Filter compiles into a bool `should` filter. + # + # @example + # UsersIndex.filter{ name == 'Johny' }.filter{ age <= 42 }.filter_mode(:should) + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {bool: {should: [{term: {name: 'Johny'}}, {range: {age: {lte: 42}}}]}} + # }}}} + # + # * Any acceptable `minimum_should_match` value (1, '2', '75%') + # Filter compiles into bool `should` filter with `minimum_should_match` set. + # + # @example + # UsersIndex.filter{ name == 'Johny' }.filter{ age <= 42 }.filter_mode('50%') + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {bool: { + # should: [{term: {name: 'Johny'}}, {range: {age: {lte: 42}}}], + # minimum_should_match: '50%' + # }} + # }}}} + # + # Default value for `:filter_mode` might be changed + # with `Chewy.filter_mode` config option. + # + # @example + # Chewy.filter_mode = :should + # Chewy.filter_mode = '50%' + # + def filter_mode(value) + chain { criteria.update_options filter_mode: value } + end + + # Acts the same way as `filter_mode`, but used for `post_filter`. + # Note that it fallbacks by default to `Chewy.filter_mode` if + # `Chewy.post_filter_mode` is nil. + # + # @example + # UsersIndex.post_filter{ name == 'Johny' }.post_filter{ age <= 42 }.post_filter_mode(:and) + # UsersIndex.post_filter{ name == 'Johny' }.post_filter{ age <= 42 }.post_filter_mode(:should) + # UsersIndex.post_filter{ name == 'Johny' }.post_filter{ age <= 42 }.post_filter_mode('50%') + # + def post_filter_mode(value) + chain { criteria.update_options post_filter_mode: value } + end + + # A search timeout, bounding the search request to be executed within the + # specified time value and bail with the hits accumulated up to that point + # when expired. Defaults to no timeout. + # + # By default, the coordinating node waits to receive a response from all + # shards. If one node is having trouble, it could slow down the response to + # all search requests. + # + # The timeout parameter tells the coordinating node how long it should wait + # before giving up and just returning the results that it already has. It + # can be better to return some results than none at all. + # + # The response to a search request will indicate whether the search timed + # out and how many shards responded successfully: + # + # @example + # ... + # "timed_out": true, + # "_shards": { + # "total": 5, + # "successful": 4, + # "failed": 1 + # }, + # ... + # + # The primary shard assigned to perform the index operation might not be + # available when the index operation is executed. Some reasons for this + # might be that the primary shard is currently recovering from a gateway or + # undergoing relocation. By default, the index operation will wait on the + # primary shard to become available for up to 1 minute before failing and + # responding with an error. The timeout parameter can be used to explicitly + # specify how long it waits. + # + # @example + # UsersIndex.timeout("5000ms") + # + # Timeout is not a circuit breaker. + # + # It should be noted that this timeout does not halt the execution of the + # query, it merely tells the coordinating node to return the results + # collected so far and to close the connection. In the background, other + # shards may still be processing the query even though results have been + # sent. + # + # Use the timeout because it is important to your SLA, not because you want + # to abort the execution of long running queries. + # + def timeout(value) + chain { criteria.update_request_options timeout: value } + end + + # Sets elasticsearch `size` search request param + # Default value is set in the elasticsearch and is 10. + # + # @example + # UsersIndex.filter{ name == 'Johny' }.limit(100) + # # => {body: { + # query: {...}, + # size: 100 + # }} + # + def limit(value = nil, &block) + chain { criteria.update_request_options size: block || Integer(value) } + end + + # Sets elasticsearch `from` search request param + # + # @example + # UsersIndex.filter{ name == 'Johny' }.offset(300) + # # => {body: { + # query: {...}, + # from: 300 + # }} + # + def offset(value = nil, &block) + chain { criteria.update_request_options from: block || Integer(value) } + end + + # Elasticsearch highlight query option support + # + # @example + # UsersIndex.query(...).highlight(fields: { ... }) + # + def highlight(value) + chain { criteria.update_request_options highlight: value } + end + + # Elasticsearch rescore query option support + # + # @example + # UsersIndex.query(...).rescore(query: { ... }) + # + def rescore(value) + chain { criteria.update_request_options rescore: value } + end + + # Elasticsearch minscore option support + # + # @example + # UsersIndex.query(...).min_score(0.5) + # + def min_score(value) + chain { criteria.update_request_options min_score: value } + end + + # Elasticsearch track_scores option support + # + # @example + # UsersIndex.query(...).track_scores(true) + # + def track_scores(value) + chain { criteria.update_request_options track_scores: value } + end + + # Adds facets section to the search request. + # All the chained facets a merged and added to the + # search request + # + # @example + # UsersIndex.facets(tags: {terms: {field: 'tags'}}).facets(ages: {terms: {field: 'age'}}) + # # => {body: { + # query: {...}, + # facets: {tags: {terms: {field: 'tags'}}, ages: {terms: {field: 'age'}}} + # }} + # + # If called parameterless - returns result facets from ES performing request. + # Returns empty hash if no facets was requested or resulted. + # + def facets(params = nil) + raise RemovedFeature, 'removed in elasticsearch 2.0' if Runtime.version >= '2.0' + if params + chain { criteria.update_facets params } + else + _response['facets'] || {} + end + end + + # Adds a script function to score the search request. All scores are + # added to the search request and combinded according to + # `boost_mode` and `score_mode` + # + # @example + # UsersIndex.script_score("doc['boost'].value", params: { modifier: 2 }) + # # => {body: + # query: { + # function_score: { + # query: { ...}, + # functions: [{ + # script_score: { + # script: "doc['boost'].value * modifier", + # params: { modifier: 2 } + # } + # } + # }] + # } } } + def script_score(script, options = {}) + scoring = {script_score: {script: script}.merge(options)} + chain { criteria.update_scores scoring } + end + + # Adds a boost factor to the search request. All scores are + # added to the search request and combinded according to + # `boost_mode` and `score_mode` + # + # This probably only makes sense if you specify a filter + # for the boost factor as well + # + # @example + # UsersIndex.boost_factor(23, filter: { term: { foo: :bar} }) + # # => {body: + # query: { + # function_score: { + # query: { ...}, + # functions: [{ + # boost_factor: 23, + # filter: { term: { foo: :bar } } + # }] + # } } } + def boost_factor(factor, options = {}) + scoring = options.merge(boost_factor: factor.to_i) + chain { criteria.update_scores scoring } + end + + # Add a weight scoring function to the search. All scores are + # added to the search request and combinded according to + # `boost_mode` and `score_mode` + # + # This probably only makes sense if you specify a filter + # for the weight as well. + # + # @example + # UsersIndex.weight(23, filter: { term: { foo: :bar} }) + # # => {body: + # query: { + # function_score: { + # query: { ...}, + # functions: [{ + # weight: 23, + # filter: { term: { foo: :bar } } + # }] + # } } } + def weight(factor, options = {}) + scoring = options.merge(weight: factor.to_i) + chain { criteria.update_scores scoring } + end + + # Adds a random score to the search request. All scores are + # added to the search request and combinded according to + # `boost_mode` and `score_mode` + # + # This probably only makes sense if you specify a filter + # for the random score as well. + # + # If you do not pass in a seed value, Time.now will be used + # + # @example + # UsersIndex.random_score(23, filter: { foo: :bar}) + # # => {body: + # query: { + # function_score: { + # query: { ...}, + # functions: [{ + # random_score: { seed: 23 }, + # filter: { foo: :bar } + # }] + # } } } + def random_score(seed = Time.now, options = {}) + scoring = options.merge(random_score: {seed: seed.to_i}) + chain { criteria.update_scores scoring } + end + + # Add a field value scoring to the search. All scores are + # added to the search request and combinded according to + # `boost_mode` and `score_mode` + # + # This function is only available in Elasticsearch 1.2 and + # greater + # + # @example + # UsersIndex.field_value_factor( + # { + # field: :boost, + # factor: 1.2, + # modifier: :sqrt + # }, filter: { foo: :bar}) + # # => {body: + # query: { + # function_score: { + # query: { ...}, + # functions: [{ + # field_value_factor: { + # field: :boost, + # factor: 1.2, + # modifier: :sqrt + # }, + # filter: { foo: :bar } + # }] + # } } } + def field_value_factor(settings, options = {}) + scoring = options.merge(field_value_factor: settings) + chain { criteria.update_scores scoring } + end + + # Add a decay scoring to the search. All scores are + # added to the search request and combinded according to + # `boost_mode` and `score_mode` + # + # The parameters have default values, but those may not + # be very useful for most applications. + # + # @example + # UsersIndex.decay( + # :gauss, + # :field, + # origin: '11, 12', + # scale: '2km', + # offset: '5km', + # decay: 0.4, + # filter: { foo: :bar}) + # # => {body: + # query: { + # gauss: { + # query: { ...}, + # functions: [{ + # gauss: { + # field: { + # origin: '11, 12', + # scale: '2km', + # offset: '5km', + # decay: 0.4 + # } + # }, + # filter: { foo: :bar } + # }] + # } } } + def decay(function, field, options = {}) + field_options = options.extract!(:origin, :scale, :offset, :decay).delete_if { |_, v| v.nil? } + scoring = options.merge(function => { + field => field_options + }) + chain { criteria.update_scores scoring } + end + + # Sets `preference` for request. + # For instance, one can use `preference=_primary` to execute only on the primary shards. + # + # @example + # scope = UsersIndex.preference(:_primary) + # + def preference(value) + chain { criteria.update_search_options preference: value } + end + + # Sets elasticsearch `aggregations` search request param + # + # @example + # UsersIndex.filter{ name == 'Johny' }.aggregations(category_id: {terms: {field: 'category_ids'}}) + # # => {body: { + # query: {...}, + # aggregations: { + # terms: { + # field: 'category_ids' + # } + # } + # }} + # + def aggregations(params = nil) + @_named_aggs ||= _build_named_aggs + @_fully_qualified_named_aggs ||= _build_fqn_aggs + if params + params = {params => @_named_aggs[params]} if params.is_a?(Symbol) + params = {params => _get_fully_qualified_named_agg(params)} if params.is_a?(String) && params =~ /\A\S+#\S+\.\S+\z/ + chain { criteria.update_aggregations params } + else + _response['aggregations'] || {} + end + end + alias_method :aggs, :aggregations + + # In this simplest of implementations each named aggregation must be uniquely named + def _build_named_aggs + named_aggs = {} + @_indexes.each do |index| + index.types.each do |type| + type._agg_defs.each do |agg_name, prc| + named_aggs[agg_name] = prc.call + end + end + end + named_aggs + end + + def _build_fqn_aggs + named_aggs = {} + @_indexes.each do |index| + named_aggs[index.to_s.downcase] ||= {} + index.types.each do |type| + named_aggs[index.to_s.downcase][type.to_s.downcase] ||= {} + type._agg_defs.each do |agg_name, prc| + named_aggs[index.to_s.downcase][type.to_s.downcase][agg_name.to_s.downcase] = prc.call + end + end + end + named_aggs + end + + def _get_fully_qualified_named_agg(str) + parts = str.scan(/\A(\S+)#(\S+)\.(\S+)\z/).first + idx = "#{parts[0]}index" + type = "#{idx}::#{parts[1]}" + agg_name = parts[2] + @_fully_qualified_named_aggs[idx][type][agg_name] + end + + # Sets elasticsearch `suggest` search request param + # + # @example + # UsersIndex.suggest(name: {text: 'Joh', term: {field: 'name'}}) + # # => {body: { + # query: {...}, + # suggest: { + # text: 'Joh', + # term: { + # field: 'name' + # } + # } + # }} + # + def suggest(params = nil) + if params + chain { criteria.update_suggest params } + else + _response['suggest'] || {} + end + end + + # Marks the criteria as having zero documents. This scope always returns empty array + # without touching the elasticsearch server. + # All the chained calls of methods don't affect the result + # + # @example + # UsersIndex.none.to_a + # # => [] + # UsersIndex.query(text: {name: 'Johny'}).none.to_a + # # => [] + # UsersIndex.none.query(text: {name: 'Johny'}).to_a + # # => [] + # + def none + chain { criteria.update_options none: true } + end + + # Setups strategy for top-level filtered query + # + # @example + # UsersIndex.filter { name == 'Johny'}.strategy(:leap_frog) + # # => {body: { + # query: { filtered: { + # filter: { term: { name: 'Johny' } }, + # strategy: 'leap_frog' + # } } + # }} + # + def strategy(value = nil) + chain { criteria.update_options strategy: value } + end + + # Adds one or more query to the search request + # Internally queries are stored as an array + # While the full query compilation this array compiles + # according to `:query_mode` option value + # + # By default it joines inside `must` query + # See `#query_mode` chainable method for more info. + # + # @example + # UsersIndex.query(text: {name: 'Johny'}).query(range: {age: {lte: 42}}) + # UsersIndex::User.query(text: {name: 'Johny'}).query(range: {age: {lte: 42}}) + # # => {body: { + # query: {bool: {must: [{text: {name: 'Johny'}}, {range: {age: {lte: 42}}}]}} + # }} + # + # If only one query was specified, it will become a result + # query as is, without joining. + # + # @example + # UsersIndex.query(text: {name: 'Johny'}) + # # => {body: { + # query: {text: {name: 'Johny'}} + # }} + # + def query(params) + chain { criteria.update_queries params } + end + + # Adds one or more filter to the search request + # Internally filters are stored as an array + # While the full query compilation this array compiles + # according to `:filter_mode` option value + # + # By default it joins inside `and` filter + # See `#filter_mode` chainable method for more info. + # + # Also this method supports block DSL. + # See `Chewy::Query::Filters` for more info. + # + # @example + # UsersIndex.filter(term: {name: 'Johny'}).filter(range: {age: {lte: 42}}) + # UsersIndex::User.filter(term: {name: 'Johny'}).filter(range: {age: {lte: 42}}) + # UsersIndex.filter{ name == 'Johny' }.filter{ age <= 42 } + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {and: [{term: {name: 'Johny'}}, {range: {age: {lte: 42}}}]} + # }}}} + # + # If only one filter was specified, it will become a result + # filter as is, without joining. + # + # @example + # UsersIndex.filter(term: {name: 'Johny'}) + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {term: {name: 'Johny'}} + # }}}} + # + def filter(params = nil, &block) + params = Filters.new(&block).__render__ if block + chain { criteria.update_filters params } + end + + # Adds one or more post_filter to the search request + # Internally post_filters are stored as an array + # While the full query compilation this array compiles + # according to `:post_filter_mode` option value + # + # By default it joins inside `and` filter + # See `#post_filter_mode` chainable method for more info. + # + # Also this method supports block DSL. + # See `Chewy::Query::Filters` for more info. + # + # @example + # UsersIndex.post_filter(term: {name: 'Johny'}).post_filter(range: {age: {lte: 42}}) + # UsersIndex::User.post_filter(term: {name: 'Johny'}).post_filter(range: {age: {lte: 42}}) + # UsersIndex.post_filter{ name == 'Johny' }.post_filter{ age <= 42 } + # # => {body: { + # post_filter: {and: [{term: {name: 'Johny'}}, {range: {age: {lte: 42}}}]} + # }} + # + # If only one post_filter was specified, it will become a result + # post_filter as is, without joining. + # + # @example + # UsersIndex.post_filter(term: {name: 'Johny'}) + # # => {body: { + # post_filter: {term: {name: 'Johny'}} + # }} + # + def post_filter(params = nil, &block) + params = Filters.new(&block).__render__ if block + chain { criteria.update_post_filters params } + end + + # Sets the boost mode for custom scoring/boosting. + # Not used if no score functions are specified + # Possible values: + # + # * `:multiply` + # Default value. Query score and function result are multiplied. + # + # @example + # UsersIndex.boost_mode('multiply').script_score('doc['boost'].value') + # # => {body: {query: function_score: { + # query: {...}, + # boost_mode: 'multiply', + # functions: [ ... ] + # }}} + # + # * `:replace` + # Only function result is used, query score is ignored. + # + # * `:sum` + # Query score and function score are added. + # + # * `:avg` + # Average of query and function score. + # + # * `:max` + # Max of query and function score. + # + # * `:min` + # Min of query and function score. + # + # Default value for `:boost_mode` might be changed + # with `Chewy.score_mode` config option. + def boost_mode(value) + chain { criteria.update_options boost_mode: value } + end + + # Sets the scoring mode for combining function scores/boosts + # Not used if no score functions are specified. + # Possible values: + # + # * `:multiply` + # Default value. Scores are multiplied. + # + # @example + # UsersIndex.score_mode('multiply').script_score('doc['boost'].value') + # # => {body: {query: function_score: { + # query: {...}, + # score_mode: 'multiply', + # functions: [ ... ] + # }}} + # + # * `:sum` + # Scores are summed. + # + # * `:avg` + # Scores are averaged. + # + # * `:first` + # The first function that has a matching filter is applied. + # + # * `:max` + # Maximum score is used. + # + # * `:min` + # Minimum score is used + # + # Default value for `:score_mode` might be changed + # with `Chewy.score_mode` config option. + # + # @example + # Chewy.score_mode = :first + # + def score_mode(value) + chain { criteria.update_options score_mode: value } + end + + # Sets search request sorting + # + # @example + # UsersIndex.order(:first_name, :last_name).order(age: :desc).order(price: {order: :asc, mode: :avg}) + # # => {body: { + # query: {...}, + # sort: ['first_name', 'last_name', {age: 'desc'}, {price: {order: 'asc', mode: 'avg'}}] + # }} + # + def order(*params) + chain { criteria.update_sort params } + end + + # Cleans up previous search sorting and sets the new one + # + # @example + # UsersIndex.order(:first_name, :last_name).order(age: :desc).reorder(price: {order: :asc, mode: :avg}) + # # => {body: { + # query: {...}, + # sort: [{price: {order: 'asc', mode: 'avg'}}] + # }} + # + def reorder(*params) + chain { criteria.update_sort params, purge: true } + end + + # Sets search request field list + # + # @example + # UsersIndex.only(:first_name, :last_name).only(:age) + # # => {body: { + # query: {...}, + # fields: ['first_name', 'last_name', 'age'] + # }} + # + def only(*params) + chain { criteria.update_fields params } + end + + # Cleans up previous search field list and sets the new one + # + # @example + # UsersIndex.only(:first_name, :last_name).only!(:age) + # # => {body: { + # query: {...}, + # fields: ['age'] + # }} + # + def only!(*params) + chain { criteria.update_fields params, purge: true } + end + + # Specify types participating in the search result + # Works via `types` filter. Always merged with another filters + # with the `and` filter. + # + # @example + # UsersIndex.types(:admin, :manager).filters{ name == 'Johny' }.filters{ age <= 42 } + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {and: [ + # {or: [ + # {type: {value: 'admin'}}, + # {type: {value: 'manager'}} + # ]}, + # {term: {name: 'Johny'}}, + # {range: {age: {lte: 42}}} + # ]} + # }}}} + # + # UsersIndex.types(:admin, :manager).filters{ name == 'Johny' }.filters{ age <= 42 }.filter_mode(:or) + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {and: [ + # {or: [ + # {type: {value: 'admin'}}, + # {type: {value: 'manager'}} + # ]}, + # {or: [ + # {term: {name: 'Johny'}}, + # {range: {age: {lte: 42}}} + # ]} + # ]} + # }}}} + # + def types(*params) + chain { criteria.update_types params } + end + + # Acts the same way as `types`, but cleans up previously set types + # + # @example + # UsersIndex.types(:admin).types!(:manager) + # # => {body: {query: {filtered: { + # query: {...}, + # filter: {type: {value: 'manager'}} + # }}}} + # + def types!(*params) + chain { criteria.update_types params, purge: true } + end + + # Sets `search_type` for request. + # For instance, one can use `search_type=count` to fetch only total count of documents or to fetch only aggregations without fetching documents. + # + # @example + # scope = UsersIndex.search_type(:count) + # scope.count == 0 # no documents actually fetched + # scope.total == 10 # but we know a total count of them + # + # scope = UsersIndex.aggs(max_age: { max: { field: 'age' } }).search_type(:count) + # max_age = scope.aggs['max_age']['value'] + # + def search_type(value) + chain { criteria.update_search_options search_type: value } + end + + # Merges two queries. + # Merges all the values in criteria with the same rules as values added manually. + # + # @example + # scope1 = UsersIndex.filter{ name == 'Johny' } + # scope2 = UsersIndex.filter{ age <= 42 } + # scope3 = UsersIndex.filter{ name == 'Johny' }.filter{ age <= 42 } + # + # scope1.merge(scope2) == scope3 # => true + # + def merge(other) + chain { criteria.merge!(other.criteria) } + end + + # Deletes all documents matching a query. + # + # @example + # UsersIndex.delete_all + # UsersIndex.filter{ age <= 42 }.delete_all + # UsersIndex::User.delete_all + # UsersIndex::User.filter{ age <= 42 }.delete_all + # + def delete_all + if Runtime.version >= '2.0' + plugins = Chewy.client(_indexes.first.hosts_name).nodes.info(plugins: true)['nodes'].values.map { |item| item['plugins'] }.flatten + raise PluginMissing, 'install delete-by-query plugin' unless plugins.find { |item| item['name'] == 'delete-by-query' } + end + + request = chain { criteria.update_options simple: true }.send(:_request) + + ActiveSupport::Notifications.instrument 'delete_query.chewy', + request: request, indexes: _indexes, types: _types, + index: _indexes.one? ? _indexes.first : _indexes, + type: _types.one? ? _types.first : _types do + if Runtime.version >= '2.0' + path = Elasticsearch::API::Utils.__pathify( + Elasticsearch::API::Utils.__listify(request[:index]), + Elasticsearch::API::Utils.__listify(request[:type]), + '/_query' + ) + Chewy.client(_indexes.first.hosts_name).perform_request(Elasticsearch::API::HTTP_DELETE, path, {}, request[:body]).body + else + Chewy.client(_indexes.first.hosts_name).delete_by_query(request) + end + end + end + + # Find all documents matching a query. + # + # @example + # UsersIndex.find(42) + # UsersIndex.filter{ age <= 42 }.find(42) + # UsersIndex::User.find(42) + # UsersIndex::User.filter{ age <= 42 }.find(42) + # + # In all the previous examples find will return a single object. + # To get a collection - pass an array of ids. + # + # @example + # UsersIndex::User.find(42, 7, 3) # array of objects with ids in [42, 7, 3] + # UsersIndex::User.find([8, 13]) # array of objects with ids in [8, 13] + # UsersIndex::User.find([42]) # array of the object with id == 42 + # + def find(*ids) + results = chain { criteria.update_options simple: true }.filter { _id == ids.flatten }.to_a + + raise Chewy::DocumentNotFound, "Could not find documents for ids #{ids.flatten}" if results.empty? + ids.one? && !ids.first.is_a?(Array) ? results.first : results + end + + # Returns true if there are at least one document that matches the query + # + # @example + # PlacesIndex.query(...).filter(...).exists? + # + def exists? + search_type(:count).total > 0 + end + + # Sets limit to be equal to total documents count + # + # @example + # PlacesIndex.query(...).filter(...).unlimited + # + + def unlimited + count_query = search_type(:count) + offset(0).limit { count_query.total } + end + + # Returns request total time elapsed as reported by elasticsearch + # + # @example + # UsersIndex.query(...).filter(...).took + # + def took + _response['took'] + end + + # Returns request timed_out as reported by elasticsearch + # + # The timed_out value tells us whether the query timed out or not. + # + # By default, search requests do not timeout. If low response times are more + # important to you than complete results, you can specify a timeout as 10 or + # "10ms" (10 milliseconds), or "1s" (1 second). See #timeout method. + # + # @example + # UsersIndex.query(...).filter(...).timed_out + # + def timed_out + _response['timed_out'] + end + + protected + + def initialize_clone(origin) + @criteria = origin.criteria.clone + reset + end + + private + + def chain(&block) + clone.tap { |q| q.instance_exec(&block) } + end + + def reset + @_request, @_response, @_results, @_collection = nil + end + + def _request + @_request ||= begin + request = criteria.request_body + request[:index] = _indexes_hash.keys + request[:type] = _types.map(&:type_name) + request + end + end + + def _response + @_response ||= ActiveSupport::Notifications.instrument 'search_query.chewy', + request: _request, indexes: _indexes, types: _types, + index: _indexes.one? ? _indexes.first : _indexes, + type: _types.one? ? _types.first : _types do + begin + Chewy.client(_indexes.first.hosts_name).search(_request) + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + raise e if e.message !~ /IndexMissingException/ && e.message !~ /index_not_found_exception/ + {} + end + end + end + + def _results + @_results ||= (criteria.none? || _response == {} ? [] : _response['hits']['hits']).map do |hit| + _derive_type(hit['_index'], hit['_type']).build(hit) + end + end + + def _collection + @_collection ||= begin + _load_objects! if criteria.options[:preload] + if criteria.options[:preload] && criteria.options[:loaded_objects] + _results.map(&:_object) + else + _results + end + end + end + + def _derive_type(index, type) + (@types_cache ||= {})[[index, type]] ||= _derive_index(index).type(type) + end + + def _derive_index(index_name) + (@derive_index ||= {})[index_name] ||= _indexes_hash[index_name] || + _indexes_hash[_indexes_hash.keys.sort_by(&:length).reverse.detect { |name| index_name.start_with?(name) }] + end + + def _indexes_hash + @_indexes_hash ||= _indexes.index_by(&:index_name) + end + end +end diff --git a/lib/chewy/query/loading.rb b/lib/chewy/query/loading.rb new file mode 100644 index 000000000..73ed1238c --- /dev/null +++ b/lib/chewy/query/loading.rb @@ -0,0 +1,110 @@ +module Chewy + class Query + module Loading + extend ActiveSupport::Concern + + # Lazily loads actual ORM/ODM objects for search result. + # Returns scope marked to return loaded objects array instead of + # chewy wrappers. In case when object can not be loaded because it + # was deleted or don't satisfy given scope or options - the + # result collection will contain nil value in the place of this + # object. Use `compact` method to avoid this if necessary. + # + # UsersIndex.query(...).load #=> [#, ...] + # UsersIndex.query(...).load.filter(...) #=> [#, ...] + # + # Possible options: + # + # :scope - used to give a scope for _every_ loaded type. + # + # PlacesIndex.query(...).load(scope: ->{ includes(:testimonials) }) + # + # If places here contain cities and countries then preload will be + # done like this: + # + # City.where(id: [...]).includes(:testimonials) + # Country.where(id: [...]).includes(:testimonials) + # + # It is also possible to pass own scope for every loaded type: + # + # PlacesIndex.query(...).load( + # city: { scope: ->{ includes(:testimonials, :country) }} + # country: { scope: ->{ includes(:testimonials, :cities) }} + # ) + # + # And loading will be performed as: + # + # City.where(id: [...]).includes(:testimonials, :country) + # Country.where(id: [...]).includes(:testimonials, :cities) + # + # In case of ActiveRecord objects loading the same result + # will be reached using ActiveRecord scopes instead of + # lambdas. But it works only with per-type scopes, + # and doesn't work with the common scope. + # + # PlacesIndex.query(...).load( + # city: { scope: City.includes(:testimonials, :country) } + # country: { scope: Country.includes(:testimonials, :cities) } + # ) + # + # :only - loads objects for the specified types + # + # PlacesIndex.query(...).load(only: :city) + # PlacesIndex.query(...).load(only: [:city]) + # PlacesIndex.query(...).load(only: [:city, :country]) + # + # :except - doesn't load listed types + # + # PlacesIndex.query(...).load(except: :city) + # PlacesIndex.query(...).load(except: [:city]) + # PlacesIndex.query(...).load(except: [:city, :country]) + # + def load(options = {}) + chain { criteria.update_options preload: options, loaded_objects: true } + end + + # This methods is just convenient way to preload some ORM/ODM + # objects and continue to work with Chewy wrappers. Returns + # Chewy query scope. Note that `load` method performs ES request + # so preload method should also be the last in scope methods chain. + # Takes the same options as the `load` method + # + # PlacesIndex.query(...).preload(only: :city) + # + # Loaded objects are also attached to corresponding Chewy + # type wrapper objects and available with `_object` accessor. + # + # scope = PlacesIndex.query(...) + # preload_scope = scope.preload + # preload_scope.first #=> PlacesIndex::City wrapper instance + # preload_scope.first._object #=> City model instance + # scope.load == preload_scope.map(&:_object) #=> true + # + def preload(options = {}) + chain { criteria.update_options preload: options, loaded_objects: false } + end + + private + + def _load_objects! + options = criteria.options[:preload] + only = Array.wrap(options[:only]).map(&:to_s) + except = Array.wrap(options[:except]).map(&:to_s) + + loaded_objects = Hash[_results.group_by(&:class).map do |type, objects| + next if except.include?(type.type_name) + next if only.present? && !only.include?(type.type_name) + loaded = type.adapter.load(objects.map(&:id), **options.merge(_type: type)) || objects + [type, loaded.index_by.with_index do |loaded_object, i| + objects[i]._object = loaded_object + objects[i] + end] + end.compact] + + _results.map do |result| + loaded_objects[result.class][result] if loaded_objects[result.class] + end + end + end + end +end diff --git a/lib/chewy/query/pagination.rb b/lib/chewy/query/pagination.rb new file mode 100644 index 000000000..72ee7f5ea --- /dev/null +++ b/lib/chewy/query/pagination.rb @@ -0,0 +1,25 @@ +module Chewy + class Query + module Pagination + # Returns request total found documents count + # + # PlacesIndex.query(...).filter(...).total + # + def total + _response['hits']['total'] ? _response['hits']['total']['value'] || 0 : 0 + end + alias_method :total_count, :total + alias_method :total_entries, :total + + private + + def raw_limit_value + criteria.request_options[:size] + end + + def raw_offset_value + criteria.request_options[:from] + end + end + end +end diff --git a/lib/chewy/search/parameters.rb b/lib/chewy/search/parameters.rb index 1f15e6168..6fed04360 100644 --- a/lib/chewy/search/parameters.rb +++ b/lib/chewy/search/parameters.rb @@ -104,8 +104,8 @@ def merge!(other) # Renders and merges all the parameter storages into a single hash. # # @return [Hash] request body - def render - render_query_string_params.merge(render_body) + def render(replace_post_filter: false) + render_query_string_params.merge(render_body(replace_post_filter: replace_post_filter)) end protected @@ -137,23 +137,36 @@ def render_query_string_params end end - def render_body + def render_body(replace_post_filter: false) exceptions = %i[filter query none] + QUERY_STRING_STORAGES + if replace_post_filter + exceptions += %i[post_filter] + end + body = @storages.except(*exceptions).values.inject({}) do |result, storage| result.merge!(storage.render || {}) end - body.merge!(render_query || {}) + body.merge!(render_query(replace_post_filter: replace_post_filter) || {}) {body: body} end - def render_query + def render_query(replace_post_filter: false) none = @storages[:none].render return none if none + post_filter = @storages[:post_filter].render filter = @storages[:filter].render query = @storages[:query].render + if replace_post_filter && post_filter + if query + query = {query: {bool: {must: [query[:query], post_filter[:post_filter]]}}} + else + query = {query: {bool: {must: [post_filter[:post_filter]]}}} + end + end + return query unless filter if query && query[:query][:bool] diff --git a/lib/chewy/search/request.rb b/lib/chewy/search/request.rb index 6c7a62156..eb06bee73 100644 --- a/lib/chewy/search/request.rb +++ b/lib/chewy/search/request.rb @@ -77,6 +77,10 @@ def parameters @parameters ||= Parameters.new end + def set_x_opaque_id(x_opaque_id) + @x_opaque_id = x_opaque_id + end + # Compare two scopes or scope with a collection of wrappers. # If other is a collection it performs the request to fetch # data from ES. @@ -117,8 +121,8 @@ def response=(from_elasticsearch) # ES request body # # @return [Hash] request body - def render - @render ||= parameters.render + def render(replace_post_filter: false) + @render ||= parameters.render(replace_post_filter: replace_post_filter) end # Includes the class name and the result of rendering. @@ -556,7 +560,7 @@ def reorder(value, *values) # @return [Chewy::Search::Request] %i[source stored_fields].each do |name| define_method name do |value, *values| - modify(name) { update!(values.empty? ? value : [value, *values]) } + modify(name) { update!(['_index_type', *value, *values]) } end end @@ -841,7 +845,11 @@ def count if performed? total else - Chewy.client.count(only(WHERE_STORAGES).render)['count'] + count_params = only(WHERE_STORAGES).render(replace_post_filter: true) + if @x_opaque_id + count_params.merge!({opaque_id: @x_opaque_id}) + end + Chewy.client(_indices.first.hosts_name).count(count_params)['count'] end rescue Elasticsearch::Transport::Transport::Errors::NotFound 0 diff --git a/lib/chewy/type/adapter/mongoid.rb b/lib/chewy/type/adapter/mongoid.rb new file mode 100644 index 000000000..71e018826 --- /dev/null +++ b/lib/chewy/type/adapter/mongoid.rb @@ -0,0 +1,108 @@ +require 'chewy/type/adapter/orm' + +module Chewy + class Type + module Adapter + class Mongoid < Orm + def self.accepts?(target) + defined?(::Mongoid::Document) && ( + target.is_a?(Class) && target.ancestors.include?(::Mongoid::Document) || + target.is_a?(::Mongoid::Criteria)) + end + + def identify(collection) + super(collection).map { |id| id.is_a?(BSON::ObjectId) ? id.to_s : id } + end + + private + + def cleanup_default_scope! + Chewy.logger.warn('Default type scope order, limit and offset are ignored and will be nullified') if Chewy.logger && @default_scope.options.values_at(:sort, :limit, :skip).compact.present? + + @default_scope.options.delete(:limit) + @default_scope.options.delete(:skip) + @default_scope = @default_scope.reorder(nil) + end + + def import_scope(scope, options) + pluck_in_batches(scope, **options.slice(:batch_size)).map do |ids| + yield grouped_objects(default_scope_where_ids_in(ids)) + end.all? + end + + def import_objects(collection, options) + direct_import = (default_scope.selector.empty? || @options[:searchable_proc]) && + !options[:raw_import] && + collection.is_a?(Array) && + !collection.empty? && + collection.all? { |item| item.is_a?(::Mongoid::Document) && item.__selected_fields.nil? } + + collection_ids = identify(collection) + hash = Hash[collection_ids.map(&:to_s).zip(collection)] + + indexed = collection_ids.each_slice(options[:batch_size]).map do |ids| + batch = if options[:raw_import] + raw_default_scope_where_ids_in(ids, options[:raw_import]) + elsif direct_import + hash.values_at(*ids.map(&:to_s)) + else + default_scope_where_ids_in(ids) + end + + batch = batch.to_a + + # If it's not searchable/indexable, we should delete it. + if direct_import && @options[:searchable_proc] + batch = batch.select { |object| @options[:searchable_proc].call(object) } + end + + if batch.empty? + true + else + batch.each { |object| hash.delete(object.send(primary_key).to_s) } + yield grouped_objects(batch) + end + end.all? + + deleted = hash.keys.each_slice(options[:batch_size]).map do |group| + yield delete: hash.values_at(*group) + end.all? + + indexed && deleted + end + + def primary_key + :_id + end + + def pluck(scope, fields: []) + scope.pluck(primary_key, *fields) + end + + def pluck_in_batches(scope, fields: [], batch_size: nil, **options) + return enum_for(:pluck_in_batches, scope, fields: fields, batch_size: batch_size, **options) unless block_given? + + scope.batch_size(batch_size).no_timeout.pluck(primary_key, *fields).each_slice(batch_size) do |batch| + yield batch + end + end + + def scope_where_ids_in(scope, ids) + scope.where(primary_key.in => ids) + end + + def all_scope + target.all + end + + def relation_class + ::Mongoid::Criteria + end + + def object_class + ::Mongoid::Document + end + end + end + end +end diff --git a/lib/chewy/type/adapter/sequel.rb b/lib/chewy/type/adapter/sequel.rb new file mode 100644 index 000000000..751a00836 --- /dev/null +++ b/lib/chewy/type/adapter/sequel.rb @@ -0,0 +1,93 @@ +require 'chewy/type/adapter/base' + +module Chewy + class Type + module Adapter + class Sequel < Orm + attr_reader :default_scope + alias_method :default_dataset, :default_scope + + def self.accepts?(target) + defined?(::Sequel::Model) && ( + target.is_a?(Class) && target < ::Sequel::Model || + target.is_a?(::Sequel::Dataset)) + end + + private + + def cleanup_default_scope! + Chewy.logger.warn('Default type scope order, limit and offset are ignored and will be nullified') if Chewy.logger && @default_scope != @default_scope.unordered.unlimited + + @default_scope = @default_scope.unordered.unlimited + end + + def import_scope(scope, options) + pluck_in_batches(scope, **options.slice(:batch_size)).inject(true) do |result, ids| + result & yield(grouped_objects(default_scope_where_ids_in(ids).all)) + end + end + + def primary_key + target.primary_key + end + + def full_column_name(column) + ::Sequel.qualify(target.table_name, column) + end + + def all_scope + target.dataset + end + + def target_columns + @target_columns ||= target.columns.to_set + end + + def pluck(scope, fields: []) + fields = fields.map(&:to_sym).unshift(primary_key).map do |column| + target_columns.include?(column) ? full_column_name(column) : column + end + scope.distinct.select_map(fields.one? ? fields.first : fields) + end + + def pluck_in_batches(scope, fields: [], batch_size: nil, **options) + return enum_for(:pluck_in_batches, scope, fields: fields, batch_size: batch_size, **options) unless block_given? + + scope = scope.unordered.order(full_column_name(primary_key).asc).limit(batch_size) + + ids = pluck(scope, fields: fields) + count = 0 + + while ids.present? + yield ids + break if ids.size < batch_size + last_id = ids.last.is_a?(Array) ? ids.last.first : ids.last + ids = pluck(scope.where { |_o| full_column_name(primary_key) > last_id }, fields: fields) + end + + count + end + + def scope_where_ids_in(scope, ids) + scope.where(full_column_name(primary_key) => Array.wrap(ids)) + end + + def model_of_relation(relation) + relation.model + end + + def relation_class + ::Sequel::Dataset + end + + def object_class + ::Sequel::Model + end + + def load_scope_objects(*args) + super.all + end + end + end + end +end diff --git a/spec/chewy/type/import/bulk_builder_spec.rb b/spec/chewy/type/import/bulk_builder_spec.rb new file mode 100644 index 000000000..094b51e2d --- /dev/null +++ b/spec/chewy/type/import/bulk_builder_spec.rb @@ -0,0 +1,279 @@ +require 'spec_helper' + +describe Chewy::Type::Import::BulkBuilder do + before { Chewy.massacre } + + subject { described_class.new(type, index: index, delete: delete, fields: fields) } + let(:type) { PlacesIndex::City } + let(:index) { [] } + let(:delete) { [] } + let(:fields) { [] } + + describe '#bulk_body' do + context 'simple bulk', :orm do + before do + stub_model(:city) + stub_index(:places) do + define_type City do + field :name, :rating + end + end + end + let(:cities) { Array.new(3) { |i| City.create!(id: i + 1, name: "City#{i + 17}", rating: 42) } } + + specify { expect(subject.bulk_body).to eq([]) } + + context do + let(:index) { cities } + specify do + expect(subject.bulk_body).to eq([ + {index: {_id: 1, data: {'name' => 'City17', 'rating' => 42}}}, + {index: {_id: 2, data: {'name' => 'City18', 'rating' => 42}}}, + {index: {_id: 3, data: {'name' => 'City19', 'rating' => 42}}} + ]) + end + end + + context do + let(:delete) { cities } + specify do + expect(subject.bulk_body).to eq([ + {delete: {_id: 1}}, {delete: {_id: 2}}, {delete: {_id: 3}} + ]) + end + end + + context do + let(:index) { cities.first(2) } + let(:delete) { [cities.last] } + specify do + expect(subject.bulk_body).to eq([ + {index: {_id: 1, data: {'name' => 'City17', 'rating' => 42}}}, + {index: {_id: 2, data: {'name' => 'City18', 'rating' => 42}}}, + {delete: {_id: 3}} + ]) + end + + context ':fields' do + let(:fields) { %w[name] } + specify do + expect(subject.bulk_body).to eq([ + {update: {_id: 1, data: {doc: {'name' => 'City17'}}}}, + {update: {_id: 2, data: {doc: {'name' => 'City18'}}}}, + {delete: {_id: 3}} + ]) + end + end + end + end + + context 'parent-child relationship', :orm do + before do + stub_model(:country) + stub_model(:city) + adapter == :sequel ? City.many_to_one(:country) : City.belongs_to(:country) + end + + before do + stub_index(:places) do + define_type Country do + field :name + end + + define_type City do + root parent: 'country', parent_id: -> { country_id } do + field :name + field :rating + end + end + end + end + + before { PlacesIndex::Country.import(country) } + let(:country) { Country.create!(id: 1, name: 'country') } + let(:another_country) { Country.create!(id: 2, name: 'another country') } + let(:city) { City.create!(id: 4, country_id: country.id, name: 'city', rating: 42) } + + context 'indexing' do + let(:index) { [city] } + + specify do + expect(subject.bulk_body).to eq([ + {index: {_id: city.id, parent: country.id, data: {'name' => 'city', 'rating' => 42}}} + ]) + end + + context do + let(:fields) { %w[name] } + + specify do + expect(subject.bulk_body).to eq([ + {update: {_id: city.id, parent: country.id, data: {doc: {'name' => 'city'}}}} + ]) + end + end + end + + context 'updating parent' do + before do + PlacesIndex::City.import(city) + city.update(country_id: another_country.id) + end + let(:index) { [city] } + + specify do + expect(subject.bulk_body).to eq([ + {delete: {_id: city.id, parent: country.id.to_s}}, + {index: {_id: city.id, parent: another_country.id, data: {'name' => 'city', 'rating' => 42}}} + ]) + end + + context do + let(:fields) { %w[name] } + + specify do + expect(subject.bulk_body).to eq([ + {delete: {_id: city.id, parent: country.id.to_s}}, + {index: {_id: city.id, parent: another_country.id, data: {'name' => 'city', 'rating' => 42}}} + ]) + end + end + end + + context 'destroying' do + before { PlacesIndex::City.import(city) } + let(:delete) { [city] } + + specify do + expect(subject.bulk_body).to eq([ + {delete: {_id: city.id, parent: country.id.to_s}} + ]) + end + end + end + + context 'custom id', :orm do + before do + stub_model(:city) + end + + before do + stub_index(:places) do + define_type City do + root id: -> { name } do + field :rating + end + end + end + end + + let(:london) { City.create(id: 1, name: 'London', rating: 4) } + + specify do + expect { PlacesIndex::City.import(london) } + .to update_index(PlacesIndex::City).and_reindex(london.name) + end + + context 'indexing' do + let(:index) { [london] } + + specify do + expect(subject.bulk_body).to eq([ + {index: {_id: london.name, data: {'rating' => 4}}} + ]) + end + end + + context 'destroying' do + let(:delete) { [london] } + + specify do + expect(subject.bulk_body).to eq([ + {delete: {_id: london.name}} + ]) + end + end + end + + context 'crutches' do + before do + stub_index(:places) do + define_type :city do + crutch :names do |collection| + collection.map { |item| [item.id, "Name#{item.id}"] }.to_h + end + + field :name, value: ->(o, c) { c.names[o.id] } + end + end + end + + let(:index) { [double(id: 42)] } + + specify do + expect(subject.bulk_body).to eq([ + {index: {_id: 42, data: {'name' => 'Name42'}}} + ]) + end + + context 'witchcraft' do + before { PlacesIndex::City.witchcraft! } + specify do + expect(subject.bulk_body).to eq([ + {index: {_id: 42, data: {'name' => 'Name42'}}} + ]) + end + end + end + + context 'empty ids' do + before do + stub_index(:places) do + define_type :city do + field :name + end + end + end + + let(:index) { [{id: 1, name: 'Name0'}, double(id: '', name: 'Name1'), double(name: 'Name2')] } + let(:delete) { [double(id: '', name: 'Name3'), {name: 'Name4'}, '', 2] } + + specify do + expect(subject.bulk_body).to eq([ + {index: {_id: 1, data: {'name' => 'Name0'}}}, + {index: {data: {'name' => 'Name1'}}}, + {index: {data: {'name' => 'Name2'}}}, + {delete: {_id: {'name' => 'Name4'}}}, + {delete: {_id: 2}} + ]) + end + + context do + let(:fields) { %w[name] } + + specify do + expect(subject.bulk_body).to eq([ + {update: {_id: 1, data: {doc: {'name' => 'Name0'}}}}, + {delete: {_id: {'name' => 'Name4'}}}, + {delete: {_id: 2}} + ]) + end + end + end + end + + describe '#index_objects_by_id' do + before do + stub_index(:places) do + define_type :city do + field :name + end + end + end + + let(:index) { [double(id: 1), double(id: 2), double(id: ''), double] } + let(:delete) { [double(id: 3)] } + + specify { expect(subject.index_objects_by_id).to eq('1' => index.first, '2' => index.second) } + end +end diff --git a/spec/chewy/type/observe_spec.rb b/spec/chewy/type/observe_spec.rb new file mode 100644 index 000000000..d8afd05a3 --- /dev/null +++ b/spec/chewy/type/observe_spec.rb @@ -0,0 +1,137 @@ +require 'spec_helper' + +describe Chewy::Type::Observe do + describe '.update_index' do + before do + stub_index(:dummies) do + define_type :dummy + end + end + + let(:backreferenced) { Array.new(3) { |i| double(id: i) } } + + specify do + expect { DummiesIndex::Dummy.update_index(backreferenced) } + .to raise_error Chewy::UndefinedUpdateStrategy + end + specify do + expect { DummiesIndex::Dummy.update_index([]) } + .not_to update_index('dummies#dummy') + end + specify do + expect { DummiesIndex::Dummy.update_index(nil) } + .not_to update_index('dummies#dummy') + end + end + + context 'integration', :orm do + let(:update_condition) { true } + + before do + city_countries_update_proc = if adapter == :sequel + ->(*) { previous_changes.try(:[], :country_id) || country } + else + ->(*) { changes['country_id'] || previous_changes['country_id'] || country } + end + + stub_model(:city) do + update_index(->(city) { "cities##{city.class.name.underscore}" }) { self } + update_index 'countries#country', &city_countries_update_proc + end + + stub_model(:country) do + update_index('cities#city', if: -> { update_condition }) { cities } + update_index(-> { "countries##{self.class.name.underscore}" }, :self) + attr_accessor :update_condition + end + + if adapter == :sequel + City.many_to_one :country + Country.one_to_many :cities + City.plugin :dirty + else + if adapter == :mongoid && Mongoid::VERSION.start_with?('6') + City.belongs_to :country, optional: true + else + City.belongs_to :country + end + Country.has_many :cities + end + + stub_index(:cities) do + define_type City + end + + stub_index(:countries) do + define_type Country + end + end + + context do + let!(:country1) { Chewy.strategy(:atomic) { Country.create!(id: 1, update_condition: update_condition) } } + let!(:country2) { Chewy.strategy(:atomic) { Country.create!(id: 2, update_condition: update_condition) } } + let!(:city) { Chewy.strategy(:atomic) { City.create!(id: 1, country: country1) } } + + specify { expect { city.save! }.to update_index('cities#city').and_reindex(city).only } + specify { expect { city.save! }.to update_index('countries#country').and_reindex(country1).only } + + specify { expect { city.update!(country: nil) }.to update_index('cities#city').and_reindex(city).only } + specify { expect { city.update!(country: nil) }.to update_index('countries#country').and_reindex(country1).only } + + specify { expect { city.update!(country: country2) }.to update_index('cities#city').and_reindex(city).only } + specify { expect { city.update!(country: country2) }.to update_index('countries#country').and_reindex(country1, country2).only } + end + + context do + let!(:country) do + Chewy.strategy(:atomic) do + cities = Array.new(2) { |i| City.create!(id: i) } + if adapter == :sequel + Country.create(id: 1, update_condition: update_condition).tap do |country| + cities.each { |city| country.add_city(city) } + end + else + Country.create!(id: 1, cities: cities, update_condition: update_condition) + end + end + end + + specify { expect { country.save! }.to update_index('cities#city').and_reindex(country.cities).only } + specify { expect { country.save! }.to update_index('countries#country').and_reindex(country).only } + + context 'conditional update' do + let(:update_condition) { false } + specify { expect { country.save! }.not_to update_index('cities#city') } + end + end + end + + context 'transactions', :active_record do + context do + before { stub_model(:city) { update_index 'cities#city', :self } } + before { stub_index(:cities) { define_type City } } + + specify do + Chewy.strategy(:urgent) do + ActiveRecord::Base.transaction do + expect { City.create! }.not_to update_index('cities#city') + end + end + end + end + + context do + before { allow(Chewy).to receive_messages(use_after_commit_callbacks: false) } + before { stub_model(:city) { update_index 'cities#city', :self } } + before { stub_index(:cities) { define_type City } } + + specify do + Chewy.strategy(:urgent) do + ActiveRecord::Base.transaction do + expect { City.create! }.to update_index('cities#city') + end + end + end + end + end +end