1
0
Fork 0

implemented Creation of Raw Graph and Resolution

This commit is contained in:
Sandro La Bruzzo 2021-06-30 17:27:55 +02:00
parent 623a0c4edb
commit 1a6b398968
14 changed files with 227 additions and 1057 deletions

View File

@ -1,3 +0,0 @@
package eu.dnetlib.dhp.sx.graph
case class IdReplace(newId:String, oldId:String) {}

View File

@ -1,153 +0,0 @@
package eu.dnetlib.dhp.sx.graph;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.QueryBuilder;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* This job is responsible to collect data from mongoDatabase and store in a sequence File on HDFS Mongo database
* contains information of each MDSTore in two collections: -metadata That contains info like: ID, format, layout,
* interpretation -metadataManager: that contains info : ID, mongoCollectionName from the metadata collection we filter
* the ids with Format, layout, and Interpretation from the metadataManager we get the current MONGO collection name
* which contains metadata XML see function getCurrentId
* <p>
* This Job will be called different times in base at the triple we want import, and generates for each triple a
* sequence file of XML
*/
public class ImportDataFromMongo {
/**
* It requires in input some parameters described on a file
* eu/dnetlib/dhp/graph/sx/import_from_mongo_parameters.json
* <p>
* - the name node - the paht where store HDFS File - the mongo host - the mongo port - the metadata format to
* import - the metadata layout to import - the metadata interpretation to import - the mongo database Name
* <p>
* This params are encoded into args
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
ImportDataFromMongo.class
.getResourceAsStream(
"/eu/dnetlib/dhp/sx/graph/argumentparser/import_from_mongo_parameters.json")));
parser.parseArgument(args);
final int port = Integer.parseInt(parser.get("dbport"));
final String host = parser.get("dbhost");
final String format = parser.get("format");
final String layout = parser.get("layout");
final String interpretation = parser.get("interpretation");
final String dbName = parser.get("dbName");
final MongoClient client = new MongoClient(host, port);
MongoDatabase database = client.getDatabase(dbName);
MongoCollection<Document> metadata = database.getCollection("metadata");
MongoCollection<Document> metadataManager = database.getCollection("metadataManager");
final DBObject query = QueryBuilder
.start("format")
.is(format)
.and("layout")
.is(layout)
.and("interpretation")
.is(interpretation)
.get();
final List<String> ids = new ArrayList<>();
metadata
.find((Bson) query)
.forEach((Consumer<Document>) document -> ids.add(document.getString("mdId")));
List<String> databaseId = ids
.stream()
.map(it -> getCurrentId(it, metadataManager))
.filter(Objects::nonNull)
.collect(Collectors.toList());
final String hdfsuri = parser.get("namenode");
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem.get(URI.create(hdfsuri), conf);
Path hdfswritepath = new Path(parser.get("targetPath"));
final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
conf,
SequenceFile.Writer.file(hdfswritepath),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
databaseId
.forEach(
id -> {
System.out.println("Reading :" + id);
MongoCollection<Document> collection = database.getCollection(id);
collection
.find()
.forEach(
(Consumer<Document>) document -> {
key.set(counter.getAndIncrement());
value.set(document.getString("body"));
if (counter.get() % 10000 == 0) {
System.out.println("Added " + counter.get());
}
try {
writer.append(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
});
}
}
/**
* Return the name of mongo collection giving an MdStore ID
*
* @param mdId The id of the MDStore
* @param metadataManager The collection metadataManager on mongo which contains this information
* @return
*/
private static String getCurrentId(
final String mdId, final MongoCollection<Document> metadataManager) {
FindIterable<Document> result = metadataManager.find((Bson) QueryBuilder.start("mdId").is(mdId).get());
final Document item = result.first();
return item == null ? null : item.getString("currentId");
}
}

View File

@ -0,0 +1,79 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkCreateInputGraph {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/extract_entities_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val resultObject = List(
("publication", classOf[Publication]),
("dataset", classOf[OafDataset]),
("software", classOf[Software]),
("otherResearchProduct", classOf[OtherResearchProduct])
)
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val oafDs:Dataset[Oaf] = spark.read.load(s"$sourcePath/*").as[Oaf]
resultObject.foreach(r => extractEntities(oafDs,s"$targetPath/extracted/${r._1}", r._2, log))
extractEntities(oafDs,s"$targetPath/extracted/relation", classOf[Relation], log)
resultObject.foreach { r =>
log.info(s"Make ${r._1} unique")
makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/dedup/${r._1}",spark, r._2)
}
}
def extractEntities[T <: Oaf ](oafDs:Dataset[Oaf], targetPath:String, clazz:Class[T], log:Logger) :Unit = {
implicit val resEncoder: Encoder[T] = Encoders.kryo(clazz)
log.info(s"Extract ${clazz.getSimpleName}")
oafDs.filter(o => o.isInstanceOf[T]).map(p => p.asInstanceOf[T]).write.mode(SaveMode.Overwrite).save(targetPath)
}
def makeDatasetUnique[T <: Result ](sourcePath:String, targetPath:String, spark:SparkSession, clazz:Class[T]) :Unit = {
import spark.implicits._
implicit val resEncoder: Encoder[T] = Encoders.kryo(clazz)
val ds:Dataset[T] = spark.read.load(sourcePath).as[T]
ds.groupByKey(_.getId).reduceGroups{(x,y) =>
x.mergeFrom(y)
x
}.map(_._2).write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -1,126 +0,0 @@
package eu.dnetlib.dhp.sx.graph;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import net.minidev.json.JSONArray;
/**
* This Job extracts a typology of entity and stores it in a new RDD This job is called different times, for each file
* generated by the Job {@link ImportDataFromMongo} and store the new RDD in a path that should be under a folder:
* extractedEntities/entity/version1
* <p>
* at the end of this process we will have : extractedEntities/dataset/version1 extractedEntities/dataset/version2
* extractedEntities/dataset/... extractedEntities/publication/version1 extractedEntities/publication/version2
* extractedEntities/publication/... extractedEntities/unknown/version1 extractedEntities/unknown/version2
* extractedEntities/unknown/... extractedEntities/relation/version1 extractedEntities/relation/version2
* extractedEntities/relation/...
*/
public class SparkExtractEntitiesJob {
static final String IDJSONPATH = "$.id";
static final String SOURCEJSONPATH = "$.source";
static final String TARGETJSONPATH = "$.target";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkExtractEntitiesJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkExtractEntitiesJob.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String targetPath = parser.get("targetPath");
final String tdir = parser.get("targetDir");
final JavaRDD<String> inputRDD = sc.textFile(inputPath);
List<String> entities = Arrays
.stream(parser.get("entities").split(","))
.map(String::trim)
.collect(Collectors.toList());
if (entities.stream().anyMatch("dataset"::equalsIgnoreCase)) {
// Extract Dataset
inputRDD
.filter(SparkExtractEntitiesJob::isDataset)
.saveAsTextFile(targetPath + "/dataset/" + tdir, GzipCodec.class);
}
if (entities.stream().anyMatch("unknown"::equalsIgnoreCase)) {
// Extract Unknown
inputRDD
.filter(SparkExtractEntitiesJob::isUnknown)
.saveAsTextFile(targetPath + "/unknown/" + tdir, GzipCodec.class);
}
if (entities.stream().anyMatch("relation"::equalsIgnoreCase)) {
// Extract Relation
inputRDD
.filter(SparkExtractEntitiesJob::isRelation)
.saveAsTextFile(targetPath + "/relation/" + tdir, GzipCodec.class);
}
if (entities.stream().anyMatch("publication"::equalsIgnoreCase)) {
// Extract Relation
inputRDD
.filter(SparkExtractEntitiesJob::isPublication)
.saveAsTextFile(targetPath + "/publication/" + tdir, GzipCodec.class);
}
}
public static boolean isDataset(final String json) {
final String id = getJPathString(IDJSONPATH, json);
if (StringUtils.isBlank(id))
return false;
return id.startsWith("60|");
}
public static boolean isPublication(final String json) {
final String id = getJPathString(IDJSONPATH, json);
if (StringUtils.isBlank(id))
return false;
return id.startsWith("50|");
}
public static boolean isUnknown(final String json) {
final String id = getJPathString(IDJSONPATH, json);
if (StringUtils.isBlank(id))
return false;
return id.startsWith("70|");
}
public static boolean isRelation(final String json) {
final String source = getJPathString(SOURCEJSONPATH, json);
final String target = getJPathString(TARGETJSONPATH, json);
return StringUtils.isNotBlank(source) && StringUtils.isNotBlank(target);
}
public static String getJPathString(final String jsonPath, final String json) {
try {
Object o = JsonPath.read(json, jsonPath);
if (o instanceof String)
return (String) o;
if (o instanceof JSONArray && ((JSONArray) o).size() > 0)
return (String) ((JSONArray) o).get(0);
return "";
} catch (Exception e) {
return "";
}
}
}

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkResolveRelation {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val relationPath = parser.get("relationPath")
log.info(s"sourcePath -> $relationPath")
val entityPath = parser.get("entityPath")
log.info(s"targetPath -> $entityPath")
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
import spark.implicits._
val entities:Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result]
entities.flatMap(e => e.getPid.asScala
.map(p =>
convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid))
.filter(s => s!= null)
.map(s => (s,e.getId))
).groupByKey(_._1)
.reduceGroups((x,y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y)
.map(s =>s._2)
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedPid")
val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)]
val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_1")), "left").map{
m =>
val sourceResolved = m._2
val currentRelation = m._1._2
if (sourceResolved!=null && sourceResolved._2.nonEmpty)
currentRelation.setSource(sourceResolved._2)
currentRelation
}.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedSource")
val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_1")), "left").map{
m =>
val targetResolved = m._2
val currentRelation = m._1._2
if (targetResolved!=null && targetResolved._2.nonEmpty)
currentRelation.setTarget(targetResolved._2)
currentRelation
}.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50"))
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedRelation")
}
def convertPidToDNETIdentifier(pid:String, pidType: String):String = {
if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty)
null
else
s"unresolved::${pid.toLowerCase}::${pidType.toLowerCase}"
}
}

View File

@ -1,75 +0,0 @@
package eu.dnetlib.dhp.sx.graph;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
/**
* In some case the identifier generated for the Entity in @{@link SparkExtractEntitiesJob} is different from the
* identifier * associated by the aggregator, this means that some relation points to missing identifier To avoid this
* problem we store in the model the Id and the OriginalObJIdentifier This jobs extract this pair and creates a Similar
* relation that will be used in SparkMergeEntities
*/
public class SparkSXGeneratePidSimlarity {
static final String IDJSONPATH = "$.id";
static final String OBJIDPATH = "$.originalObjIdentifier";
public static void generateDataFrame(
final SparkSession spark,
final JavaSparkContext sc,
final String inputPath,
final String targetPath) {
final JavaPairRDD<String, String> datasetSimRel = sc
.textFile(inputPath + "/dataset/*")
.mapToPair(
(PairFunction<String, String, String>) k -> new Tuple2<>(
DHPUtils.getJPathString(IDJSONPATH, k),
DHPUtils.getJPathString(OBJIDPATH, k)))
.filter(
t -> !StringUtils
.substringAfter(t._1(), "|")
.equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
.distinct();
final JavaPairRDD<String, String> publicationSimRel = sc
.textFile(inputPath + "/publication/*")
.mapToPair(
(PairFunction<String, String, String>) k -> new Tuple2<>(
DHPUtils.getJPathString(IDJSONPATH, k),
DHPUtils.getJPathString(OBJIDPATH, k)))
.filter(
t -> !StringUtils
.substringAfter(t._1(), "|")
.equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
.distinct();
JavaRDD<Relation> simRel = datasetSimRel
.union(publicationSimRel)
.map(
s -> {
final Relation r = new Relation();
r.setSource(s._1());
r.setTarget(s._2());
r.setRelType("similar");
return r;
});
spark
.createDataset(simRel.rdd(), Encoders.bean(Relation.class))
.distinct()
.write()
.mode(SaveMode.Overwrite)
.save(targetPath + "/pid_simRel");
}
}

View File

@ -1,256 +0,0 @@
package eu.dnetlib.dhp.sx.graph;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.utils.DHPUtils;
import net.minidev.json.JSONArray;
import scala.Tuple2;
/**
* This job is responsible of the creation of RAW Graph It is applied to the different entities generated from
* {@link SparkExtractEntitiesJob} In case of dataset, publication and Unknown Entities we group all the entities of the
* same type by their identifier, and then in the reduce phase we merge all the entities. Merge means: -merge all the
* metadata -merge the collected From values
* <p>
* In case of relation we need to make a different work: -Phase 1: Map reduce jobs Map: Get all Relation and emit a key
* constructed by (source, relType, Target) and the relation itself Reduce: Merge all relations Looking at the javadoc
* of {@link SparkSXGeneratePidSimlarity} we take the dataset of pid relation and joining by source and target we
* replace the wrong identifier in the relation with the correct ones. At the end we replace the new Dataset of Relation
*/
public class SparkScholexplorerCreateRawGraphJob {
static final String IDJSONPATH = "$.id";
static final String SOURCEJSONPATH = "$.source";
static final String TARGETJSONPATH = "$.target";
static final String RELJSONPATH = "$.relType";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkScholexplorerCreateRawGraphJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/sx/graph/argumentparser/merge_entities_scholix_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.config(
new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
.appName(SparkScholexplorerCreateRawGraphJob.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String targetPath = parser.get("targetPath");
final String entity = parser.get("entity");
FileSystem fs = FileSystem.get(sc.sc().hadoopConfiguration());
List<Path> subFolder = Arrays
.stream(fs.listStatus(new Path(inputPath)))
.filter(FileStatus::isDirectory)
.map(FileStatus::getPath)
.collect(Collectors.toList());
List<JavaRDD<String>> inputRdd = new ArrayList<>();
subFolder.forEach(p -> inputRdd.add(sc.textFile(p.toUri().getRawPath())));
JavaRDD<String> union = sc.emptyRDD();
for (JavaRDD<String> item : inputRdd) {
union = union.union(item);
}
switch (entity) {
case "dataset":
union
.mapToPair(
(PairFunction<String, String, DLIDataset>) f -> {
final String id = getJPathString(IDJSONPATH, f);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Tuple2<>(id, mapper.readValue(f, DLIDataset.class));
})
.reduceByKey(
(a, b) -> {
a.mergeFrom(b);
return a;
})
.map(
item -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item._2());
})
.saveAsTextFile(targetPath, GzipCodec.class);
break;
case "publication":
union
.mapToPair(
(PairFunction<String, String, DLIPublication>) f -> {
final String id = getJPathString(IDJSONPATH, f);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Tuple2<>(id, mapper.readValue(f, DLIPublication.class));
})
.reduceByKey(
(a, b) -> {
a.mergeFrom(b);
return a;
})
.map(
item -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item._2());
})
.saveAsTextFile(targetPath, GzipCodec.class);
break;
case "unknown":
union
.mapToPair(
(PairFunction<String, String, DLIUnknown>) f -> {
final String id = getJPathString(IDJSONPATH, f);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Tuple2<>(id, mapper.readValue(f, DLIUnknown.class));
})
.reduceByKey(
(a, b) -> {
a.mergeFrom(b);
return a;
})
.map(
item -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item._2());
})
.saveAsTextFile(targetPath, GzipCodec.class);
break;
case "relation":
SparkSXGeneratePidSimlarity
.generateDataFrame(
spark, sc, inputPath.replace("/relation", ""), targetPath.replace("/relation", ""));
RDD<Relation> rdd = union
.mapToPair(
(PairFunction<String, String, Relation>) f -> {
final String source = getJPathString(SOURCEJSONPATH, f);
final String target = getJPathString(TARGETJSONPATH, f);
final String reltype = getJPathString(RELJSONPATH, f);
ObjectMapper mapper = new ObjectMapper();
mapper
.configure(
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Tuple2<>(
DHPUtils
.md5(
String
.format(
"%s::%s::%s",
source.toLowerCase(),
reltype.toLowerCase(),
target.toLowerCase())),
mapper.readValue(f, Relation.class));
})
.reduceByKey(
(a, b) -> {
a.mergeFrom(b);
return a;
})
.map(Tuple2::_2)
.rdd();
spark
.createDataset(rdd, Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.save(targetPath);
Dataset<Relation> rel_ds = spark.read().load(targetPath).as(Encoders.bean(Relation.class));
System.out.println("LOADING PATH :" + targetPath.replace("/relation", "") + "/pid_simRel");
Dataset<Relation> sim_ds = spark
.read()
.load(targetPath.replace("/relation", "") + "/pid_simRel")
.as(Encoders.bean(Relation.class));
Dataset<Relation> ids = sim_ds
.map(
(MapFunction<Relation, Relation>) relation -> {
final String type = StringUtils.substringBefore(relation.getSource(), "|");
relation
.setTarget(
String
.format(
"%s|%s",
type, StringUtils.substringAfter(relation.getTarget(), "::")));
return relation;
},
Encoders.bean(Relation.class));
final Dataset<Relation> firstJoin = rel_ds
.joinWith(ids, ids.col("target").equalTo(rel_ds.col("source")), "left_outer")
.map(
(MapFunction<Tuple2<Relation, Relation>, Relation>) s -> {
if (s._2() != null) {
s._1().setSource(s._2().getSource());
}
return s._1();
},
Encoders.bean(Relation.class));
Dataset<Relation> secondJoin = firstJoin
.joinWith(ids, ids.col("target").equalTo(firstJoin.col("target")), "left_outer")
.map(
(MapFunction<Tuple2<Relation, Relation>, Relation>) s -> {
if (s._2() != null) {
s._1().setTarget(s._2().getSource());
}
return s._1();
},
Encoders.bean(Relation.class));
secondJoin.write().mode(SaveMode.Overwrite).save(targetPath + "_fixed");
FileSystem fileSystem = FileSystem.get(sc.hadoopConfiguration());
fileSystem.delete(new Path(targetPath), true);
fileSystem.rename(new Path(targetPath + "_fixed"), new Path(targetPath));
}
}
public static String getJPathString(final String jsonPath, final String json) {
try {
Object o = JsonPath.read(json, jsonPath);
if (o instanceof String)
return (String) o;
if (o instanceof JSONArray && ((JSONArray) o).size() > 0)
return (String) ((JSONArray) o).get(0);
return "";
} catch (Exception e) {
return "";
}
}
}

View File

@ -1,72 +0,0 @@
package eu.dnetlib.dhp.sx.graph;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser;
import eu.dnetlib.dhp.sx.graph.parser.PublicationScholexplorerParser;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import scala.Tuple2;
/**
* This Job read a sequential File containing XML stored in the aggregator and generates an RDD of heterogeneous
* entities like Dataset, Relation, Publication and Unknown
*/
public class SparkScholexplorerGraphImporter {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkScholexplorerGraphImporter.class
.getResourceAsStream(
"/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkScholexplorerGraphImporter.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
RelationMapper relationMapper = RelationMapper.load();
sc
.sequenceFile(inputPath, IntWritable.class, Text.class)
.map(Tuple2::_2)
.map(Text::toString)
.repartition(500)
.flatMap(
(FlatMapFunction<String, Oaf>) record -> {
switch (parser.get("entity")) {
case "dataset":
final DatasetScholexplorerParser d = new DatasetScholexplorerParser();
return d.parseObject(record, relationMapper).iterator();
case "publication":
final PublicationScholexplorerParser p = new PublicationScholexplorerParser();
return p.parseObject(record, relationMapper).iterator();
default:
throw new IllegalArgumentException("wrong values of entities");
}
})
.map(
k -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(k);
})
.saveAsTextFile(parser.get("targetPath"), GzipCodec.class);
}
}

View File

@ -1,203 +0,0 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import eu.dnetlib.dhp.sx.graph.ebi.EBIAggregator
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.functions.col
object SparkSplitOafTODLIEntities {
def getKeyRelation(rel:Relation):String = {
s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}"
}
def extract_dataset(spark:SparkSession, workingPath:String) :Unit = {
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf].repartition(4000)
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset].repartition(1000)
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIDataset])
.map(s =>s.asInstanceOf[DLIDataset])
.union(ebi_dataset)
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2)
.repartition(2000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
}
def extract_publication(spark:SparkSession, workingPath:String) :Unit = {
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication].repartition(1000)
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIPublication])
.map(s =>s.asInstanceOf[DLIPublication])
.union(ebi_publication)
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
.map(p => p._2)
.repartition(2000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
}
def extract_unknown(spark:SparkSession, workingPath:String) :Unit = {
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIUnknown])
.map(s =>s.asInstanceOf[DLIUnknown])
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, unkEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
}
def extract_ids(o:Oaf) :(String, String) = {
o match {
case p: DLIPublication =>
val prefix = StringUtils.substringBefore(p.getId, "|")
val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::")
(p.getId, s"$prefix|$original")
case p: DLIDataset =>
val prefix = StringUtils.substringBefore(p.getId, "|")
val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::")
(p.getId, s"$prefix|$original")
case _ =>null
}
}
def extract_relations(spark:SparkSession, workingPath:String) :Unit = {
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
import spark.implicits._
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000)
OAFDataset
.filter(o => o.isInstanceOf[Result])
.map(extract_ids)(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.filter(r => r != null)
.where("_1 != _2")
.select(col("_1").alias("newId"), col("_2").alias("oldId"))
.distinct()
.map(f => IdReplace(f.getString(0), f.getString(1)))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/id_replace")
OAFDataset
.filter(s => s != null && s.isInstanceOf[Relation])
.map(s =>s.asInstanceOf[Relation])
.union(ebi_relation)
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2)
.repartition(4000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation_unfixed")
val relations = spark.read.load(s"$workingPath/graph/relation_unfixed").as[Relation]
val ids = spark.read.load(s"$workingPath/graph/id_replace").as[IdReplace]
relations
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
.joinWith(ids, col("_1").equalTo(ids("oldId")), "left")
.map(i =>{
val r = i._1._2
if (i._2 != null)
{
val id = i._2.newId
r.setSource(id)
}
r
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/rel_f_source")
val rel_source:Dataset[Relation] = spark.read.load(s"$workingPath/graph/rel_f_source").as[Relation]
rel_source
.map(r => (r.getTarget, r))(Encoders.tuple(Encoders.STRING, relEncoder))
.joinWith(ids, col("_1").equalTo(ids("oldId")), "left")
.map(i =>{
val r:Relation = i._1._2
if (i._2 != null)
{
val id = i._2.newId
r.setTarget(id)
}
r
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
}
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
parser.parseArgument(args)
val workingPath: String = parser.get("workingPath")
val entity:String = parser.get("entity")
logger.info(s"Working dir path = $workingPath")
val spark:SparkSession = SparkSession
.builder()
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master(parser.get("master"))
.getOrCreate()
entity match {
case "publication" => extract_publication(spark, workingPath)
case "dataset" => extract_dataset(spark,workingPath)
case "relation" => extract_relations(spark, workingPath)
case "unknown" => extract_unknown(spark, workingPath)
}
}
}

View File

@ -1,73 +0,0 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
/**
* This new version of the Job read a sequential File containing XML stored in the aggregator and generates a Dataset OAF of heterogeneous
* entities like Dataset, Relation, Publication and Unknown
*/
object SparkXMLToOAFDataset {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(SparkXMLToOAFDataset.getClass)
val conf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkXMLToOAFDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json")))
parser.parseArgument(args)
val spark =
SparkSession
.builder()
.config(conf)
.appName(SparkXMLToOAFDataset.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation]
val relationMapper = RelationMapper.load
val inputPath: String = parser.get("sourcePath")
val entity: String = parser.get("entity")
val targetPath = parser.get("targetPath")
logger.info(s"Input path is $inputPath")
logger.info(s"Entity path is $entity")
logger.info(s"Target Path is $targetPath")
val scholixRdd:RDD[Oaf] = sc.sequenceFile(inputPath, classOf[IntWritable], classOf[Text])
.map(s => s._2.toString)
.flatMap(s => {
entity match {
case "publication" =>
val p = new PublicationScholexplorerParser
val l =p.parseObject(s, relationMapper)
if (l != null) l.asScala else List()
case "dataset" =>
val d = new DatasetScholexplorerParser
val l =d.parseObject(s, relationMapper)
if (l != null) l.asScala else List()
}
}).filter(s => s!= null)
spark.createDataset(scholixRdd).write.mode(SaveMode.Append).save(targetPath)
}
}

View File

@ -11,9 +11,9 @@ import org.slf4j.{Logger, LoggerFactory}
object SparkEBILinksToOaf {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(SparkEBILinksToOaf.getClass)
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json")))
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession

View File

@ -0,0 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
]

View File

@ -0,0 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"e", "paramLongName":"entityPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
]

View File

@ -1,121 +1,72 @@
<workflow-app name="Create Raw Graph Step 1: import Entities from aggregator to HDFS" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Create Raw Graph Step 1: extract Entities in raw graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>reuseContent</name>
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property>
<name>workingPath</name>
<name>sourcePath</name>
<description>the working dir base path</description>
</property>
<property>
<name>targetXMLPath</name>
<name>targetPath</name>
<description>the graph Raw base path</description>
</property>
<property>
<name>targetEntityPath</name>
<description>the graph Raw base path</description>
</property>
<property>
<name>format</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>layout</name>
<description>the user postgres</description>
</property>
<property>
<name>interpretation</name>
<description>the password postgres</description>
</property>
<property>
<name>dbhost</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>dbName</name>
<description>mongo database</description>
</property>
<property>
<name>entity</name>
<description>the entity type</description>
</property>
</parameters>
<start to="ReuseContent"/>
<decision name="ReuseContent">
<switch>
<case to="ImportEntitiesFromMongo">${wf:conf('reuseContent') eq false}</case>
<case to="ConvertXML2Entity">${wf:conf('reuseContent') eq true}</case>
<default to="ResetWorkingPath"/>
</switch>
</decision>
<start to="ResolveRelations"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingPath}'/>
<mkdir path='${workingPath}/input'/>
</fs>
<ok to="ImportEntitiesFromMongo"/>
<error to="Kill"/>
</action>
<action name="ImportEntitiesFromMongo">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.sx.graph.ImportDataFromMongo</main-class>
<arg>-t</arg><arg>${targetXMLPath}</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-h</arg><arg>${dbhost}</arg>
<arg>-p</arg><arg>27017</arg>
<arg>-dn</arg><arg>${dbName}</arg>
<arg>-f</arg><arg>${format}</arg>
<arg>-l</arg><arg>${layout}</arg>
<arg>-i</arg><arg>${interpretation}</arg>
</java>
<ok to="ResetTargetPath"/>
<error to="Kill"/>
</action>
<action name="ResetTargetPath">
<fs>
<delete path='${targetEntityPath}'/>
</fs>
<ok to="ConvertXML2Entity"/>
<error to="Kill"/>
</action>
<action name="ConvertXML2Entity">
<action name="ExtractEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<master>yarn</master>
<mode>cluster</mode>
<name>Import ${entity} and related entities</name>
<class>eu.dnetlib.dhp.sx.graph.SparkXMLToOAFDataset</class>
<name>Extract entities in raw graph</name>
<class>eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=2000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>--sourcePath</arg><arg>${targetXMLPath}</arg>
<arg>--targetPath</arg><arg>${workingPath}/input/OAFDataset</arg>
<arg>--entity</arg><arg>${entity}</arg>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="ResolveRelations"/>
<error to="Kill"/>
</action>
<action name="ResolveRelations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations in raw graph</name>
<class>eu.dnetlib.dhp.sx.graph.SparkResolveRelation</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=3000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--relationPath</arg><arg>${targetPath}/extracted/relation</arg>
<arg>--workingPath</arg><arg>${targetPath}/resolved/</arg>
<arg>--entityPath</arg><arg>${targetPath}/dedup</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>