reorganizing content under dhp-workflows/dhp-graph-mapper

This commit is contained in:
Claudio Atzori 2020-03-26 19:44:19 +01:00
parent 77c4294924
commit 098fabab3f
26 changed files with 131 additions and 18 deletions

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.graph;
package eu.dnetlib.dhp.graph.openaire;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;

View File

@ -1,12 +1,11 @@
package eu.dnetlib.dhp.graph;
package eu.dnetlib.dhp.graph.scholexplorer;
import com.mongodb.*;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageType;
import eu.dnetlib.dhp.graph.openaire.SparkGraphImporterJob;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

View File

@ -2,7 +2,7 @@ package eu.dnetlib.dhp.graph.scholexplorer;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.SparkGraphImporterJob;
import eu.dnetlib.dhp.graph.openaire.SparkGraphImporterJob;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;

View File

@ -1,12 +1,8 @@
package eu.dnetlib.dhp.graph.scholexplorer;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.SparkGraphImporterJob;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

View File

@ -2,7 +2,7 @@ package eu.dnetlib.dhp.graph.scholexplorer;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.SparkGraphImporterJob;
import eu.dnetlib.dhp.graph.openaire.SparkGraphImporterJob;
import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser;
import eu.dnetlib.dhp.graph.scholexplorer.parser.PublicationScholexplorerParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;

View File

@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.SparkGraphImporterJob;
import eu.dnetlib.dhp.graph.openaire.SparkGraphImporterJob;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
@ -12,7 +12,6 @@ import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.utils.DHPUtils;
import net.minidev.json.JSONArray;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -25,10 +24,7 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;
import scala.Tuple2;
import scala.collection.JavaConverters;
import sun.rmi.log.ReliableLog;
import javax.xml.crypto.Data;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hive_jdbc_url</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hive_db_name</name>
<value>openaire</value>
</property>
</configuration>

View File

@ -0,0 +1,90 @@
<workflow-app name="import_graph_as_hive_DB" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>hive_db_name</name>
<description>the target hive database name</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="MapGraphAsHiveDB"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="MapGraphAsHiveDB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MapGraphAsHiveDB</name>
<class>eu.dnetlib.dhp.graph.SparkGraphImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_db_name</arg><arg>${hive_db_name}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
</spark>
<ok to="PostProcessing"/>
<error to="Kill"/>
</action>
<action name="PostProcessing">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hive_metastore_uris}</value>
</property>
</configuration>
<jdbc-url>${hive_jdbc_url}/${hive_db_name}</jdbc-url>
<script>lib/scripts/postprocessing.sql</script>
<param>hive_db_name=${hive_db_name}</param>
</hive2>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -49,7 +49,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>MapGraphAsHiveDB</name>
<class>eu.dnetlib.dhp.graph.SparkGraphImporterJob</class>
<class>eu.dnetlib.dhp.graph.openaire.SparkGraphImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}

View File

@ -55,7 +55,7 @@
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.graph.ImportDataFromMongo</main-class>
<main-class>eu.dnetlib.dhp.graph.scholexplorer.ImportDataFromMongo</main-class>
<arg>-t</arg><arg>${targetPath}</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${user}</arg>

View File

@ -1,5 +1,7 @@
package eu.dnetlib.dhp.graph;
import eu.dnetlib.dhp.graph.openaire.GraphMappingUtils;
import eu.dnetlib.dhp.graph.openaire.SparkGraphImporterJob;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;