Elasticsearch + Spark: write json with custom document _id

Finally I found the problem: it was a typo in the config.

[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]

It was looking for a field superID but there was only superID (note the case). In the question it is also a bit misleading since in the code it appears like "es.mapping.id", "superID" (which was not correct).

The actual solution is like Levi Ramsey suggested:

val json = """{"foo":"bar","superID":"deadbeef"}"""

val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
  ("es.mapping.id", "superID"),
  ("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)

The difference is that es.mapping.id cannot be _id (as was indicated in the original post, _id is the metadata and Elasticsearch does not accept it).

Naturally it means that the new field superID should be added to the mapping (unless the mapping is dynamic). If storing additional field in the index is a burden, one should also:

  • exclude it from the mapping
  • and disable its indexing

Thanks a lot to Alex Savitsky for pointing to the correct direction.

Have you tried something like:

val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
  ("es.mapping.id", "_id")
)
rdd.saveJsonToEs("myindex/mytype", cfg)

I've tested (with elasticsearch-hadoop (connector version 2.4.5) against ES 1.7) and it works.

It can be done by passing ES_INPUT_JSON option to cfg parameters map and returning a tuple containing the document id as the first element and the document serialized in JSON as the second element from the map function.

I tested it with "org.elasticsearch" %% "elasticsearch-spark-20" % "[6.0,7.0[" against Elasticsearch 6.4

import org.elasticsearch.hadoop.cfg.ConfigurationOptions.{ES_INPUT_JSON, ES_NODES}
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

job
  .map{ r => (r._id, r.toJson()) }
  .saveToEsWithMeta(
    "myindex/mytype",
    Map(
      ES_NODES -> "https://localhost:9200",
      ES_INPUT_JSON -> true.toString
    )
  )

I spent days banging my head against the wall trying to figure out why saveToEsWithMeta would not work when I used a string for the ID like so:

rdd.map(caseClassContainingJson =>
  (caseClassContainingJson._idWhichIsAString, caseClassContainingJson.jsonString)
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
  ES_INPUT_JSON -> true.toString
))

This will throw JSON parsing-related errors, which deceptively leads you towards thinking that the issue is with your JSON, but then you log each one of your JSONs and see they're all valid.

Turns out that for whatever reason ES_INPUT_JSON -> true makes the left-hand side of the tuple, i.e. the ID, get parsed as a JSON too!

the solution, JSON stringify the ID (will wrap ID in extra double quotes) so that parsing it as a JSON works:

rdd.map(caseClassContainingJson =>
  (
    Json.stringify(JsString(caseClassContainingJson._idWhichIsAString)), 
    caseClassContainingJson.jsonString
  )
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
  ES_INPUT_JSON -> true.toString
))