forked from D-Net/dnet-hadoop
refactor class name and workflow name for graph mapper, added javadoc
This commit is contained in:
parent
673e744649
commit
a9935f80d4
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dedup;
|
package eu.dnetlib.dedup.sx;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
@ -7,14 +7,8 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.Optional;
|
|
||||||
import org.apache.spark.api.java.function.Function;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dedup;
|
package eu.dnetlib.dedup.sx;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
@ -15,7 +15,6 @@ import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
|
@ -122,7 +122,7 @@
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Propagate Dedup Relations</name>
|
<name>Propagate Dedup Relations</name>
|
||||||
<class>eu.dnetlib.dedup.SparkPropagateRelationsJob</class>
|
<class>eu.dnetlib.dedup.sx.SparkPropagateRelationsJob</class>
|
||||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory ${sparkExecutorMemory}
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
@ -146,7 +146,7 @@
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Update ${entity} and add DedupRecord</name>
|
<name>Update ${entity} and add DedupRecord</name>
|
||||||
<class>eu.dnetlib.dedup.SparkUpdateEntityJob</class>
|
<class>eu.dnetlib.dedup.sx.SparkUpdateEntityJob</class>
|
||||||
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
<jar>dhp-dedup-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory ${sparkExecutorMemory}
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
|
|
@ -1,15 +0,0 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
|
||||||
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
|
|
||||||
public class TargetFunction implements MapFunction<Relation, Relation> {
|
|
||||||
@Override
|
|
||||||
public Relation call(Relation relation) throws Exception {
|
|
||||||
final String type = StringUtils.substringBefore(relation.getSource(), "|");
|
|
||||||
relation.setTarget(String.format("%s|%s", type, StringUtils.substringAfter(relation.getTarget(),"::")));
|
|
||||||
return relation;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
package eu.dnetlib.dhp.graph.sx;
|
||||||
|
|
||||||
import com.mongodb.DBObject;
|
import com.mongodb.DBObject;
|
||||||
import com.mongodb.MongoClient;
|
import com.mongodb.MongoClient;
|
||||||
|
@ -16,7 +16,6 @@ import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.bson.Document;
|
import org.bson.Document;
|
||||||
import org.bson.conversions.Bson;
|
import org.bson.conversions.Bson;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -26,14 +25,52 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This job is responsible to collect
|
||||||
|
* data from mongoDatabase and store in a sequence File on HDFS
|
||||||
|
* Mongo database contains information of each MDSTore in two collections:
|
||||||
|
* -metadata
|
||||||
|
* That contains info like:
|
||||||
|
* ID, format, layout, interpretation
|
||||||
|
* -metadataManager:
|
||||||
|
* that contains info :
|
||||||
|
* ID, mongoCollectionName
|
||||||
|
* from the metadata collection we filter the ids with Format, layout, and Interpretation
|
||||||
|
* from the metadataManager we get the current MONGO collection name which contains metadata XML
|
||||||
|
* see function getCurrentId
|
||||||
|
*
|
||||||
|
* This Job will be called different times in base at the triple we want import,
|
||||||
|
* and generates for each triple a sequence file of XML
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
public class ImportDataFromMongo {
|
public class ImportDataFromMongo {
|
||||||
|
/**
|
||||||
|
* It requires in input some parameters described on a file eu/dnetlib/dhp/graph/sx/import_from_mongo_parameters.json
|
||||||
|
*
|
||||||
|
* - the name node
|
||||||
|
* - the paht where store HDFS File
|
||||||
|
* - the mongo host
|
||||||
|
* - the mongo port
|
||||||
|
* - the metadata format to import
|
||||||
|
* - the metadata layout to import
|
||||||
|
* - the metadata interpretation to import
|
||||||
|
* - the mongo database Name
|
||||||
|
*
|
||||||
|
* This params are encoded into args
|
||||||
|
*
|
||||||
|
*
|
||||||
|
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @param args
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils.toString(
|
IOUtils.toString(
|
||||||
ImportDataFromMongo.class.getResourceAsStream(
|
ImportDataFromMongo.class.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json")));
|
"/eu/dnetlib/dhp/sx/graph/argumentparser/import_from_mongo_parameters.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
final int port = Integer.parseInt(parser.get("dbport"));
|
final int port = Integer.parseInt(parser.get("dbport"));
|
||||||
final String host = parser.get("dbhost");
|
final String host = parser.get("dbhost");
|
||||||
|
@ -43,10 +80,7 @@ public class ImportDataFromMongo {
|
||||||
final String interpretation = parser.get("interpretation");
|
final String interpretation = parser.get("interpretation");
|
||||||
|
|
||||||
final String dbName = parser.get("dbName");
|
final String dbName = parser.get("dbName");
|
||||||
|
|
||||||
|
|
||||||
final MongoClient client = new MongoClient(host, port);
|
final MongoClient client = new MongoClient(host, port);
|
||||||
|
|
||||||
MongoDatabase database = client.getDatabase(dbName);
|
MongoDatabase database = client.getDatabase(dbName);
|
||||||
|
|
||||||
MongoCollection<Document> metadata = database.getCollection("metadata");
|
MongoCollection<Document> metadata = database.getCollection("metadata");
|
||||||
|
@ -55,6 +89,8 @@ public class ImportDataFromMongo {
|
||||||
final List<String> ids = new ArrayList<>();
|
final List<String> ids = new ArrayList<>();
|
||||||
metadata.find((Bson) query).forEach((Consumer<Document>) document -> ids.add(document.getString("mdId")));
|
metadata.find((Bson) query).forEach((Consumer<Document>) document -> ids.add(document.getString("mdId")));
|
||||||
List<String> databaseId = ids.stream().map(it -> getCurrentId(it, metadataManager)).filter(Objects::nonNull).collect(Collectors.toList());
|
List<String> databaseId = ids.stream().map(it -> getCurrentId(it, metadataManager)).filter(Objects::nonNull).collect(Collectors.toList());
|
||||||
|
|
||||||
|
|
||||||
final String hdfsuri = parser.get("namenode");
|
final String hdfsuri = parser.get("namenode");
|
||||||
// ====== Init HDFS File System Object
|
// ====== Init HDFS File System Object
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -64,8 +100,6 @@ public class ImportDataFromMongo {
|
||||||
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||||
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||||
|
|
||||||
System.setProperty("HADOOP_USER_NAME", parser.get("user"));
|
|
||||||
System.setProperty("hadoop.home.dir", "/");
|
|
||||||
FileSystem.get(URI.create(hdfsuri), conf);
|
FileSystem.get(URI.create(hdfsuri), conf);
|
||||||
Path hdfswritepath = new Path(parser.get("targetPath"));
|
Path hdfswritepath = new Path(parser.get("targetPath"));
|
||||||
|
|
||||||
|
@ -92,13 +126,17 @@ public class ImportDataFromMongo {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the name of mongo collection giving an MdStore ID
|
||||||
|
* @param mdId The id of the MDStore
|
||||||
|
* @param metadataManager The collection metadataManager on mongo which contains this information
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
private static String getCurrentId(final String mdId, final MongoCollection<Document> metadataManager) {
|
private static String getCurrentId(final String mdId, final MongoCollection<Document> metadataManager) {
|
||||||
FindIterable<Document> result = metadataManager.find((Bson) QueryBuilder.start("mdId").is(mdId).get());
|
FindIterable<Document> result = metadataManager.find((Bson) QueryBuilder.start("mdId").is(mdId).get());
|
||||||
final Document item = result.first();
|
final Document item = result.first();
|
|
@ -1,8 +1,7 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
package eu.dnetlib.dhp.graph.sx;
|
||||||
|
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.oa.graph.SparkGraphImporterJob;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
@ -16,6 +15,27 @@ import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This Job extracts a typology of entity and stores it in a new RDD
|
||||||
|
* This job is called different times, for each file generated by the Job {@link ImportDataFromMongo}
|
||||||
|
* and store the new RDD in a path that should be under a folder:
|
||||||
|
* extractedEntities/entity/version1
|
||||||
|
*
|
||||||
|
* at the end of this process we will have :
|
||||||
|
* extractedEntities/dataset/version1
|
||||||
|
* extractedEntities/dataset/version2
|
||||||
|
* extractedEntities/dataset/...
|
||||||
|
* extractedEntities/publication/version1
|
||||||
|
* extractedEntities/publication/version2
|
||||||
|
* extractedEntities/publication/...
|
||||||
|
* extractedEntities/unknown/version1
|
||||||
|
* extractedEntities/unknown/version2
|
||||||
|
* extractedEntities/unknown/...
|
||||||
|
* extractedEntities/relation/version1
|
||||||
|
* extractedEntities/relation/version2
|
||||||
|
* extractedEntities/relation/...
|
||||||
|
*/
|
||||||
|
|
||||||
public class SparkExtractEntitiesJob {
|
public class SparkExtractEntitiesJob {
|
||||||
final static String IDJSONPATH = "$.id";
|
final static String IDJSONPATH = "$.id";
|
||||||
final static String SOURCEJSONPATH = "$.source";
|
final static String SOURCEJSONPATH = "$.source";
|
||||||
|
@ -27,7 +47,7 @@ public class SparkExtractEntitiesJob {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils.toString(
|
IOUtils.toString(
|
||||||
SparkExtractEntitiesJob.class.getResourceAsStream(
|
SparkExtractEntitiesJob.class.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/graph/input_extract_entities_parameters.json")));
|
"/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
final SparkSession spark = SparkSession
|
final SparkSession spark = SparkSession
|
||||||
.builder()
|
.builder()
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
package eu.dnetlib.dhp.graph.sx;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
@ -12,13 +12,24 @@ import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class SparkScholexplorerGenerateSimRel {
|
|
||||||
|
/**
|
||||||
|
* In some case the identifier generated for the Entity in @{@link SparkExtractEntitiesJob} is different from the identifier
|
||||||
|
* * associated by the aggregator, this means that some relation points to missing identifier
|
||||||
|
* To avoid this problem we store in the model the Id and the OriginalObJIdentifier
|
||||||
|
* This jobs extract this pair and creates a Similar relation that will be used in SparkMergeEntities
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class SparkSXGeneratePidSimlarity {
|
||||||
|
|
||||||
final static String IDJSONPATH = "$.id";
|
final static String IDJSONPATH = "$.id";
|
||||||
final static String OBJIDPATH = "$.originalObjIdentifier";
|
final static String OBJIDPATH = "$.originalObjIdentifier";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static void generateDataFrame(final SparkSession spark, final JavaSparkContext sc, final String inputPath, final String targetPath) {
|
public static void generateDataFrame(final SparkSession spark, final JavaSparkContext sc, final String inputPath, final String targetPath) {
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
package eu.dnetlib.dhp.graph.sx;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
@ -11,6 +11,7 @@ import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
import net.minidev.json.JSONArray;
|
import net.minidev.json.JSONArray;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -32,7 +33,27 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class SparkScholexplorerMergeEntitiesJob {
|
|
||||||
|
/**
|
||||||
|
* This job is responsible of the creation of RAW Graph
|
||||||
|
* It is applied to the different entities generated from {@link SparkExtractEntitiesJob}
|
||||||
|
* In case of dataset, publication and Unknown Entities
|
||||||
|
* we group all the entities of the same type by their identifier,
|
||||||
|
* and then in the reduce phase we merge all the entities.
|
||||||
|
* Merge means:
|
||||||
|
* -merge all the metadata
|
||||||
|
* -merge the collected From values
|
||||||
|
*
|
||||||
|
* In case of relation we need to make a different work:
|
||||||
|
* -Phase 1: Map reduce jobs
|
||||||
|
* Map: Get all Relation and emit a key constructed by (source, relType, Target) and the relation itself
|
||||||
|
* Reduce: Merge all relations
|
||||||
|
* Looking at the javadoc of {@link SparkSXGeneratePidSimlarity} we take the dataset of pid relation
|
||||||
|
* and joining by source and target we replace the wrong identifier in the relation with the correct ones.
|
||||||
|
* At the end we replace the new Dataset of Relation
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class SparkScholexplorerCreateRawGraphJob {
|
||||||
|
|
||||||
final static String IDJSONPATH = "$.id";
|
final static String IDJSONPATH = "$.id";
|
||||||
final static String SOURCEJSONPATH = "$.source";
|
final static String SOURCEJSONPATH = "$.source";
|
||||||
|
@ -44,22 +65,20 @@ public class SparkScholexplorerMergeEntitiesJob {
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils.toString(
|
IOUtils.toString(
|
||||||
SparkScholexplorerMergeEntitiesJob.class.getResourceAsStream(
|
SparkScholexplorerCreateRawGraphJob.class.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/graph/merge_entities_scholix_parameters.json")));
|
"/eu/dnetlib/dhp/sx/graph/argumentparser/merge_entities_scholix_parameters.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
final SparkSession spark = SparkSession
|
final SparkSession spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.config(new SparkConf()
|
.config(new SparkConf()
|
||||||
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
|
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
|
||||||
.appName(SparkScholexplorerMergeEntitiesJob.class.getSimpleName())
|
.appName(SparkScholexplorerCreateRawGraphJob.class.getSimpleName())
|
||||||
.master(parser.get("master"))
|
.master(parser.get("master"))
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
final String inputPath = parser.get("sourcePath");
|
final String inputPath = parser.get("sourcePath");
|
||||||
final String targetPath = parser.get("targetPath");
|
final String targetPath = parser.get("targetPath");
|
||||||
final String entity = parser.get("entity");
|
final String entity = parser.get("entity");
|
||||||
|
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(sc.sc().hadoopConfiguration());
|
FileSystem fs = FileSystem.get(sc.sc().hadoopConfiguration());
|
||||||
List<Path> subFolder = Arrays.stream(fs.listStatus(new Path(inputPath))).filter(FileStatus::isDirectory).map(FileStatus::getPath).collect(Collectors.toList());
|
List<Path> subFolder = Arrays.stream(fs.listStatus(new Path(inputPath))).filter(FileStatus::isDirectory).map(FileStatus::getPath).collect(Collectors.toList());
|
||||||
List<JavaRDD<String>> inputRdd = new ArrayList<>();
|
List<JavaRDD<String>> inputRdd = new ArrayList<>();
|
||||||
|
@ -113,7 +132,9 @@ public class SparkScholexplorerMergeEntitiesJob {
|
||||||
break;
|
break;
|
||||||
case "relation":
|
case "relation":
|
||||||
|
|
||||||
SparkScholexplorerGenerateSimRel.generateDataFrame(spark, sc, inputPath.replace("/relation",""),targetPath.replace("/relation","") );
|
|
||||||
|
|
||||||
|
SparkSXGeneratePidSimlarity.generateDataFrame(spark, sc, inputPath.replace("/relation",""),targetPath.replace("/relation","") );
|
||||||
RDD<Relation> rdd = union.mapToPair((PairFunction<String, String, Relation>) f -> {
|
RDD<Relation> rdd = union.mapToPair((PairFunction<String, String, Relation>) f -> {
|
||||||
final String source = getJPathString(SOURCEJSONPATH, f);
|
final String source = getJPathString(SOURCEJSONPATH, f);
|
||||||
final String target = getJPathString(TARGETJSONPATH, f);
|
final String target = getJPathString(TARGETJSONPATH, f);
|
||||||
|
@ -132,9 +153,13 @@ public class SparkScholexplorerMergeEntitiesJob {
|
||||||
System.out.println("LOADING PATH :"+targetPath.replace("/relation","")+"/pid_simRel");
|
System.out.println("LOADING PATH :"+targetPath.replace("/relation","")+"/pid_simRel");
|
||||||
Dataset<Relation>sim_ds =spark.read().load(targetPath.replace("/relation","")+"/pid_simRel").as(Encoders.bean(Relation.class));
|
Dataset<Relation>sim_ds =spark.read().load(targetPath.replace("/relation","")+"/pid_simRel").as(Encoders.bean(Relation.class));
|
||||||
|
|
||||||
TargetFunction tf = new TargetFunction();
|
Dataset<Relation> ids = sim_ds.map((MapFunction<Relation, Relation>) relation->
|
||||||
|
{
|
||||||
Dataset<Relation> ids = sim_ds.map(tf, Encoders.bean(Relation.class));
|
final String type = StringUtils.substringBefore(relation.getSource(), "|");
|
||||||
|
relation.setTarget(String.format("%s|%s", type, StringUtils.substringAfter(relation.getTarget(),"::")));
|
||||||
|
return relation;
|
||||||
|
}
|
||||||
|
, Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
|
||||||
final Dataset<Relation> firstJoin = rel_ds
|
final Dataset<Relation> firstJoin = rel_ds
|
|
@ -1,9 +1,9 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
package eu.dnetlib.dhp.graph.sx;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser;
|
import eu.dnetlib.dhp.graph.sx.parser.DatasetScholexplorerParser;
|
||||||
import eu.dnetlib.dhp.graph.scholexplorer.parser.PublicationScholexplorerParser;
|
import eu.dnetlib.dhp.graph.sx.parser.PublicationScholexplorerParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -15,6 +15,12 @@ import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This Job read a sequential File containing XML stored in the aggregator
|
||||||
|
* and generates an RDD of heterogeneous entities like Dataset, Relation, Publication and Unknown
|
||||||
|
*/
|
||||||
|
|
||||||
public class SparkScholexplorerGraphImporter {
|
public class SparkScholexplorerGraphImporter {
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
@ -22,7 +28,7 @@ public class SparkScholexplorerGraphImporter {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils.toString(
|
IOUtils.toString(
|
||||||
SparkScholexplorerGraphImporter.class.getResourceAsStream(
|
SparkScholexplorerGraphImporter.class.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/graph/input_graph_scholix_parameters.json")));
|
"/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json")));
|
||||||
|
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
final SparkSession spark = SparkSession
|
final SparkSession spark = SparkSession
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer.parser;
|
package eu.dnetlib.dhp.graph.sx.parser;
|
||||||
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser;
|
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser;
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer.parser;
|
package eu.dnetlib.dhp.graph.sx.parser;
|
||||||
|
|
||||||
import com.ximpleware.AutoPilot;
|
import com.ximpleware.AutoPilot;
|
||||||
import com.ximpleware.VTDGen;
|
import com.ximpleware.VTDGen;
|
||||||
|
@ -13,7 +13,6 @@ import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node;
|
||||||
import eu.dnetlib.scholexplorer.relation.RelInfo;
|
import eu.dnetlib.scholexplorer.relation.RelInfo;
|
||||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer.parser;
|
package eu.dnetlib.dhp.graph.sx.parser;
|
||||||
|
|
||||||
import com.ximpleware.AutoPilot;
|
import com.ximpleware.AutoPilot;
|
||||||
import com.ximpleware.VTDGen;
|
import com.ximpleware.VTDGen;
|
|
@ -1,90 +0,0 @@
|
||||||
<workflow-app name="import_infospace_graph" xmlns="uri:oozie:workflow:0.5">
|
|
||||||
|
|
||||||
<parameters>
|
|
||||||
<property>
|
|
||||||
<name>sourcePath</name>
|
|
||||||
<description>the source path</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>hive_db_name</name>
|
|
||||||
<description>the target hive database name</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>sparkDriverMemory</name>
|
|
||||||
<description>memory for driver process</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>sparkExecutorMemory</name>
|
|
||||||
<description>memory for individual executor</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>sparkExecutorCores</name>
|
|
||||||
<description>number of cores used by single executor</description>
|
|
||||||
</property>
|
|
||||||
</parameters>
|
|
||||||
|
|
||||||
<global>
|
|
||||||
<job-tracker>${jobTracker}</job-tracker>
|
|
||||||
<name-node>${nameNode}</name-node>
|
|
||||||
<configuration>
|
|
||||||
<property>
|
|
||||||
<name>mapreduce.job.queuename</name>
|
|
||||||
<value>${queueName}</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
|
||||||
<value>${oozieLauncherQueueName}</value>
|
|
||||||
</property>
|
|
||||||
</configuration>
|
|
||||||
</global>
|
|
||||||
|
|
||||||
<start to="MapGraphAsHiveDB"/>
|
|
||||||
|
|
||||||
<kill name="Kill">
|
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
|
||||||
</kill>
|
|
||||||
|
|
||||||
<action name="MapGraphAsHiveDB">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>MapGraphAsHiveDB</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.SparkGraphImporterJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory ${sparkExecutorMemory}
|
|
||||||
--executor-cores ${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
|
|
||||||
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
|
|
||||||
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
|
|
||||||
</spark-opts>
|
|
||||||
<arg>-mt</arg> <arg>yarn</arg>
|
|
||||||
<arg>-s</arg><arg>${sourcePath}</arg>
|
|
||||||
<arg>-db</arg><arg>${hive_db_name}</arg>
|
|
||||||
<arg>-h</arg><arg>${hive_metastore_uris}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="PostProcessing"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="PostProcessing">
|
|
||||||
<hive2 xmlns="uri:oozie:hive2-action:0.1">
|
|
||||||
<job-tracker>${jobTracker}</job-tracker>
|
|
||||||
<name-node>${nameNode}</name-node>
|
|
||||||
<configuration>
|
|
||||||
<property>
|
|
||||||
<name>hive.metastore.uris</name>
|
|
||||||
<value>${hive_metastore_uris}</value>
|
|
||||||
</property>
|
|
||||||
</configuration>
|
|
||||||
<jdbc-url>${hive_jdbc_url}/${hive_db_name}</jdbc-url>
|
|
||||||
<script>lib/scripts/postprocessing.sql</script>
|
|
||||||
<param>hive_db_name=${hive_db_name}</param>
|
|
||||||
</hive2>
|
|
||||||
<ok to="End"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<end name="End"/>
|
|
||||||
</workflow-app>
|
|
|
@ -1,10 +0,0 @@
|
||||||
<configuration>
|
|
||||||
<property>
|
|
||||||
<name>oozie.use.system.libpath</name>
|
|
||||||
<value>true</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>oozie.action.sharelib.for.spark</name>
|
|
||||||
<value>spark2</value>
|
|
||||||
</property>
|
|
||||||
</configuration>
|
|
|
@ -1,12 +1,11 @@
|
||||||
[
|
[
|
||||||
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the name node", "paramRequired": true},
|
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the name node", "paramRequired": true},
|
||||||
{"paramName":"u", "paramLongName":"user", "paramDescription": "the name node", "paramRequired": true},
|
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the paht where store HDFS File", "paramRequired": true},
|
||||||
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the name node", "paramRequired": true},
|
|
||||||
{"paramName":"h", "paramLongName":"dbhost", "paramDescription": "the mongo host", "paramRequired": true},
|
{"paramName":"h", "paramLongName":"dbhost", "paramDescription": "the mongo host", "paramRequired": true},
|
||||||
{"paramName":"p", "paramLongName":"dbport", "paramDescription": "the mongo port", "paramRequired": true},
|
{"paramName":"p", "paramLongName":"dbport", "paramDescription": "the mongo port", "paramRequired": true},
|
||||||
{"paramName":"f", "paramLongName":"format", "paramDescription": "the metadata format to import", "paramRequired": true},
|
{"paramName":"f", "paramLongName":"format", "paramDescription": "the metadata format to import", "paramRequired": true},
|
||||||
{"paramName":"l", "paramLongName":"layout", "paramDescription": "the metadata layout to import", "paramRequired": true},
|
{"paramName":"l", "paramLongName":"layout", "paramDescription": "the metadata layout to import", "paramRequired": true},
|
||||||
{"paramName":"i", "paramLongName":"interpretation", "paramDescription": "the metadata interpretation to import", "paramRequired": true},
|
{"paramName":"i", "paramLongName":"interpretation", "paramDescription": "the metadata interpretation to import", "paramRequired": true},
|
||||||
{"paramName":"dn", "paramLongName":"dbName", "paramDescription": "the database Name", "paramRequired": true}
|
{"paramName":"dn", "paramLongName":"dbName", "paramDescription": "the mongo database Name", "paramRequired": true}
|
||||||
|
|
||||||
]
|
]
|
|
@ -1,4 +1,4 @@
|
||||||
<workflow-app name="import Entities from aggretor to HDFS" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="Create Raw Graph Step 1: import Entities from aggregator to HDFS" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>workingPath</name>
|
<name>workingPath</name>
|
||||||
|
@ -55,7 +55,7 @@
|
||||||
<java>
|
<java>
|
||||||
<job-tracker>${jobTracker}</job-tracker>
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<main-class>eu.dnetlib.dhp.graph.scholexplorer.ImportDataFromMongo</main-class>
|
<main-class>eu.dnetlib.dhp.graph.sx.ImportDataFromMongo</main-class>
|
||||||
<arg>-t</arg><arg>${targetPath}</arg>
|
<arg>-t</arg><arg>${targetPath}</arg>
|
||||||
<arg>-n</arg><arg>${nameNode}</arg>
|
<arg>-n</arg><arg>${nameNode}</arg>
|
||||||
<arg>-u</arg><arg>${user}</arg>
|
<arg>-u</arg><arg>${user}</arg>
|
|
@ -1,4 +1,4 @@
|
||||||
<workflow-app name="import_infospace_graph" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="Create Raw Graph Step 2: Map XML to OAF Entities" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>sourcePath</name>
|
<name>sourcePath</name>
|
||||||
|
@ -54,7 +54,7 @@
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Extract ${entities}</name>
|
<name>Extract ${entities}</name>
|
||||||
<class>eu.dnetlib.dhp.graph.scholexplorer.SparkExtractEntitiesJob</class>
|
<class>eu.dnetlib.dhp.graph.sx.SparkExtractEntitiesJob</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory ${sparkExecutorMemory}
|
--executor-memory ${sparkExecutorMemory}
|
|
@ -1,4 +1,4 @@
|
||||||
<workflow-app name="Infospace Merge Entities" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="Create Raw Graph Final Step: Construct the Scholexplorer Raw Graph" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>sourcePath</name>
|
<name>sourcePath</name>
|
||||||
|
@ -45,7 +45,7 @@
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Merge ${entity}</name>
|
<name>Merge ${entity}</name>
|
||||||
<class>eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerMergeEntitiesJob</class>
|
<class>eu.dnetlib.dhp.graph.sx.SparkScholexplorerCreateRawGraphJob</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts> --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
|
<spark-opts> --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}</spark-opts>
|
||||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
|
@ -1,9 +1,9 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
package eu.dnetlib.dhp.graph.sx;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||||
import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser;
|
import eu.dnetlib.dhp.graph.sx.parser.DatasetScholexplorerParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
package eu.dnetlib.dhp.graph.sx;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package eu.dnetlib.dhp.graph.scholexplorer;
|
package eu.dnetlib.dhp.graph.sx;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue