forked from antonis.lempesis/dnet-hadoop
added oozie job for DNET migration and implemented Spark job for extracting entities
This commit is contained in:
parent
fe93c709f1
commit
76ee85141a
|
@ -0,0 +1,56 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class ExtractEntitiesFromHDFSJob {
|
||||
|
||||
|
||||
private static List<String> folderNames = Arrays.asList("db_entities", "oaf_entities", "odf_entities");
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(ExtractEntitiesFromHDFSJob.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
|
||||
final String sourcePath = parser.get("sourcePath");
|
||||
final String targetPath = parser.get("graphRawPath");
|
||||
final String entity = parser.get("entity");
|
||||
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
|
||||
JavaRDD<String> inputRdd = sc.emptyRDD();
|
||||
|
||||
|
||||
folderNames.forEach(p -> inputRdd.union(
|
||||
sc.sequenceFile(sourcePath+"/"+p, Text.class, Text.class)
|
||||
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
|
||||
.filter(k -> isEntityType(k._1(), entity))
|
||||
.map(Tuple2::_2))
|
||||
);
|
||||
|
||||
inputRdd.saveAsTextFile(targetPath+"/"+entity);
|
||||
}
|
||||
|
||||
|
||||
private static boolean isEntityType(final String item, final String entity) {
|
||||
return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the HDFS source path which contains the sequential file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mt",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "should be local or yarn",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphRawPath",
|
||||
"paramDescription": "the path of the graph Raw in hdfs",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "entity",
|
||||
"paramDescription": "The entity to extract",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,22 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hdfsUser</name>
|
||||
<value>dnet</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,282 @@
|
|||
<workflow-app name="import Entities from aggretor to HDFS" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>the base path to store hdfs file</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>graphRawPath</name>
|
||||
<description>the graph Raw base path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>postgresURL</name>
|
||||
<description>the postgres URL to access to the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUser</name>
|
||||
<description>the user postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the password postgres</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongourl</name>
|
||||
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mongoDb</name>
|
||||
<description>mongo database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetWorkingPath"/>
|
||||
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetWorkingPath">
|
||||
<fs>
|
||||
<delete path='${workingPath}'/>
|
||||
<mkdir path='${workingPath}'/>
|
||||
</fs>
|
||||
<ok to="ImportEntitiesFromPostgres"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportEntitiesFromPostgres">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/db_entities</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-dburl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-dbuser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportODFEntitiesFromMongoDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODFEntitiesFromMongoDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/odf_entities</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongourl}</arg>
|
||||
<arg>-db</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>cleaned</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportOAFEntitiesFromMongoDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAFEntitiesFromMongoDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/oaf_entities</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongourl}</arg>
|
||||
<arg>-db</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>cleaned</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractPublication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: publication</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/publication</arg>
|
||||
<arg>-e</arg><arg>publication</arg>
|
||||
</spark>
|
||||
<ok to="ExtractDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: dataset</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/dataset</arg>
|
||||
<arg>-e</arg><arg>dataset</arg>
|
||||
</spark>
|
||||
<ok to="ExtractSoftware"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractSoftware">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: software</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/software</arg>
|
||||
<arg>-e</arg><arg>software</arg>
|
||||
</spark>
|
||||
<ok to="ExtractORP"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractORP">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: otherresearchproduct</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/otherresearchproduct</arg>
|
||||
<arg>-e</arg><arg>otherresearchproduct</arg>
|
||||
</spark>
|
||||
<ok to="ExtractDatasource"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractDatasource">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: datasource</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/datasource</arg>
|
||||
<arg>-e</arg><arg>datasource</arg>
|
||||
</spark>
|
||||
<ok to="ExtractOrganization"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractOrganization">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: organization</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/organization</arg>
|
||||
<arg>-e</arg><arg>organization</arg>
|
||||
</spark>
|
||||
<ok to="ExtractProject"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractProject">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: project</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/project</arg>
|
||||
<arg>-e</arg><arg>project</arg>
|
||||
</spark>
|
||||
<ok to="ExtractRelation"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractRelation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: relation</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/relation</arg>
|
||||
<arg>-e</arg><arg>relation</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -18,13 +18,13 @@ public class GraphMappingUtils {
|
|||
public final static Map<String, Class> types = Maps.newHashMap();
|
||||
|
||||
static {
|
||||
types.put("datasource", Datasource.class);
|
||||
types.put("organization", Organization.class);
|
||||
types.put("datasource", Datasource.class);
|
||||
types.put("organization", Organization.class);
|
||||
types.put("project", Project.class);
|
||||
types.put("dataset", Dataset.class);
|
||||
types.put("otherresearchproduct", OtherResearchProduct.class);
|
||||
types.put("software", Software.class);
|
||||
types.put("publication", Publication.class);
|
||||
types.put("dataset", Dataset.class);
|
||||
types.put("otherresearchproduct", OtherResearchProduct.class);
|
||||
types.put("software", Software.class);
|
||||
types.put("publication", Publication.class);
|
||||
types.put("relation", Relation.class);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue