Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
Michele Artini 2020-03-04 14:50:59 +01:00
commit 7a2a466161
5 changed files with 62 additions and 31 deletions

View File

@ -18,13 +18,13 @@ public class GraphMappingUtils {
public final static Map<String, Class> types = Maps.newHashMap(); public final static Map<String, Class> types = Maps.newHashMap();
static { static {
types.put("datasource", Datasource.class); types.put("datasource", Datasource.class);
types.put("organization", Organization.class); types.put("organization", Organization.class);
types.put("project", Project.class); types.put("project", Project.class);
types.put("dataset", Dataset.class); types.put("dataset", Dataset.class);
types.put("otherresearchproduct", OtherResearchProduct.class); types.put("otherresearchproduct", OtherResearchProduct.class);
types.put("software", Software.class); types.put("software", Software.class);
types.put("publication", Publication.class); types.put("publication", Publication.class);
types.put("relation", Relation.class); types.put("relation", Relation.class);
} }

View File

@ -15,30 +15,35 @@ public class SparkGraphImporterJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
final SparkSession spark = SparkSession
try(SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String hiveDbName = parser.get("hive_db_name");
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName));
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
// Read the input file and convert it into RDD of serializable object
GraphMappingUtils.types.forEach((name, clazz) -> {
spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class)
.map(s -> new ObjectMapper().readValue(s._2().toString(), clazz))
.rdd(), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.saveAsTable(hiveDbName + "." + name);
});
}
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
return SparkSession
.builder() .builder()
.appName(SparkGraphImporterJob.class.getSimpleName()) .appName(SparkGraphImporterJob.class.getSimpleName())
.master(parser.get("master")) .master(parser.get("master"))
.config("hive.metastore.uris", parser.get("hive_metastore_uris")) .config("hive.metastore.uris", parser.get("hive_metastore_uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate(); .getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String hiveDbName = parser.get("hive_db_name");
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName));
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
// Read the input file and convert it into RDD of serializable object
GraphMappingUtils.types.forEach((name, clazz) -> {
spark.createDataset(sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class)
.map(s -> new ObjectMapper().readValue(s._2().toString(), clazz))
.rdd(), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.saveAsTable(hiveDbName + "." + name);
});
} }
} }

View File

@ -0,0 +1,8 @@
CREATE view result as
select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p
union all
select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d
union all
select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s
union all
select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o;

View File

@ -37,12 +37,29 @@
<name>MapGraphIntoDataFrame</name> <name>MapGraphIntoDataFrame</name>
<class>eu.dnetlib.dhp.graph.SparkGraphImporterJob</class> <class>eu.dnetlib.dhp.graph.SparkGraphImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg> <arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_db_name</arg><arg>${hive_db_name}</arg> <arg>--hive_db_name</arg><arg>${hive_db_name}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
</spark> </spark>
<ok to="PostProcessing"/>
<error to="Kill"/>
</action>
<action name="PostProcessing">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>/eu/dnetlib/dhp/graph/hive/postprocessing.sql</script>
<param>hive_db_name=${hive_db_name}</param>
</hive>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -50,9 +50,9 @@
<class>eu.dnetlib.dhp.graph.SparkXmlRecordBuilderJob</class> <class>eu.dnetlib.dhp.graph.SparkXmlRecordBuilderJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCoresForJoining}
--executor-cores ${sparkExecutorCores} --executor-memory ${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemoryForJoining}
--conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForJoining} --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForJoining}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
@ -79,8 +79,9 @@
<class>eu.dnetlib.dhp.graph.SparkXmlIndexingJob</class> <class>eu.dnetlib.dhp.graph.SparkXmlIndexingJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCoresForIndexing}
--driver-memory=${sparkDriverMemory} --executor-memory ${sparkExecutorMemoryForIndexing}
--driver-memory=${sparkDriverMemoryForIndexing}
--conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing} --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"