1
0
Fork 0

added description

This commit is contained in:
Miriam Baglioni 2020-08-11 15:25:11 +02:00
parent ecd2081f84
commit 0ce49049d6
13 changed files with 64 additions and 100 deletions

View File

@ -1,3 +1,6 @@
/**
*
*/
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
@ -36,50 +39,10 @@ public class DumpProducts implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, graph);// , execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, graph);
// dumpClazz);
}); });
} }
// public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap,
// Class<? extends OafEntity> inputClazz,
// Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result> outputClazz,
// boolean graph) {
//
// SparkConf conf = new SparkConf();
//
// runWithSparkSession(
// conf,
// isSparkSessionManaged,
// spark -> {
// Utils.removeOutputDir(spark, outputPath);
// execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// ,
// // dumpClazz);
// });
// }
// public static <I extends OafEntity, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(
// SparkSession spark,
// String inputPath,
// String outputPath,
// CommunityMap communityMap,
// Class<I> inputClazz,
// Class<O> outputClazz,
// boolean graph) {
//
// // CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
//
// Utils
// .readPath(spark, inputPath, inputClazz)
// .map(value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz))
// .filter(Objects::nonNull)
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(outputPath);
//
// }
public static <I extends OafEntity, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump( public static <I extends OafEntity, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,

View File

@ -1,3 +1,7 @@
/**
* Deserialization of the information in the context needed to create Context Entities, and relations between
* context entities and datasources and projects
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;

View File

@ -1,3 +1,9 @@
/**
* Writes on HDFS Context entities. It queries the Information System at the lookup url provided as parameter
* and collects the general information for contexes of type community or ri. The general information is
* the id of the context, its label, the subjects associated to the context, its zenodo community, description and type.
* This information is used to create a new Context Entity
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
@ -6,13 +12,9 @@ import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Serializable; import java.io.Serializable;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import javax.rmi.CORBA.Util;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -21,19 +23,12 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchCommunity;
import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative; import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative;
public class CreateContextEntities implements Serializable { public class CreateContextEntities implements Serializable {
// leggo i context dall'is e mi faccio la mappa id -> contextinfo
// creo le entities con le info generali
private static final Logger log = LoggerFactory.getLogger(CreateContextEntities.class); private static final Logger log = LoggerFactory.getLogger(CreateContextEntities.class);
private final Configuration conf; private final Configuration conf;
private final BufferedWriter writer; private final BufferedWriter writer;
@ -51,7 +46,7 @@ public class CreateContextEntities implements Serializable {
final String hdfsPath = parser.get("hdfsPath"); final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath); log.info("hdfsPath: {}", hdfsPath);
final String hdfsNameNode = parser.get("hdfsNameNode"); final String hdfsNameNode = parser.get("nameNode");
log.info("nameNode: {}", hdfsNameNode); log.info("nameNode: {}", hdfsNameNode);
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");

View File

@ -1,3 +1,8 @@
/**
* Writes the set of new Relation between the context and datasources. At the moment the relation
* between the context and the project is not created because of a low coverage in the profiles of
* openaire ids related to projects
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
@ -54,7 +59,7 @@ public class CreateContextRelation implements Serializable {
final String hdfsPath = parser.get("hdfsPath"); final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath); log.info("hdfsPath: {}", hdfsPath);
final String hdfsNameNode = parser.get("hdfsNameNode"); final String hdfsNameNode = parser.get("nameNode");
log.info("nameNode: {}", hdfsNameNode); log.info("nameNode: {}", hdfsNameNode);
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
@ -101,8 +106,7 @@ public class CreateContextRelation implements Serializable {
} }
public void execute(final Function<ContextInfo, List<Relation>> producer, String category, String prefix) public void execute(final Function<ContextInfo, List<Relation>> producer, String category, String prefix) {
throws Exception {
final Consumer<ContextInfo> consumer = ci -> producer.apply(ci).forEach(c -> writeEntity(c)); final Consumer<ContextInfo> consumer = ci -> producer.apply(ci).forEach(c -> writeEntity(c));

View File

@ -1,3 +1,8 @@
/**
* Dumps of entities in the model defined in eu.dnetlib.dhp.schema.dump.oaf.graph.
* Results are dumped using the same Mapper as for eu.dnetlib.dhp.schema.dump.oaf.community, while for
* the other entities the mapping is defined below
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
@ -104,7 +109,6 @@ public class DumpGraphEntities implements Serializable {
private static Datasource mapDatasource(eu.dnetlib.dhp.schema.oaf.Datasource d) { private static Datasource mapDatasource(eu.dnetlib.dhp.schema.oaf.Datasource d) {
Datasource datasource = new Datasource(); Datasource datasource = new Datasource();
datasource.setId(d.getId()); datasource.setId(d.getId());
Optional Optional

View File

@ -1,3 +1,12 @@
/**
* Creates new Relations (as in eu.dnetlib.dhp.schema.dump.oaf.graph.Relation) from the information in the Entity.
* The new Relations are created for the datasource in the collectedfrom and hostedby elements and for the context
* related to communities and research initiative/infrastructures.
*
* For collectedfrom elements it creates: datasource -> provides -> result and result -> isProvidedBy -> datasource
* For hostedby elements it creates: datasource -> hosts -> result and result -> isHostedBy -> datasource
* For context elements it creates: context <-> isRelatedTo <-> result
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
@ -24,12 +33,11 @@ import eu.dnetlib.dhp.schema.oaf.Result;
public class Extractor implements Serializable { public class Extractor implements Serializable {
public void run(Boolean isSparkSessionManaged, public void run(Boolean isSparkSessionManaged,
String inputPath, String inputPath,
String outputPath, String outputPath,
Class<? extends Result> inputClazz, Class<? extends Result> inputClazz,
CommunityMap communityMap) { String communityMapPath) {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -39,11 +47,10 @@ public class Extractor implements Serializable {
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
extractRelationResult( extractRelationResult(
spark, inputPath, outputPath, inputClazz, communityMap); spark, inputPath, outputPath, inputClazz, Utils.getCommunityMap(spark, communityMapPath));
}); });
} }
private <R extends Result> void extractRelationResult(SparkSession spark, private <R extends Result> void extractRelationResult(SparkSession spark,
String inputPath, String inputPath,
String outputPath, String outputPath,

View File

@ -1,3 +1,8 @@
/**
* It process the ContextInfo information to produce a new Context Entity or a set of Relations between the
* generic context entity and datasource/projects related to the context.
*
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
@ -8,8 +13,6 @@ import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.Constants; import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -37,7 +40,6 @@ public class Process implements Serializable {
ri.setDescription(ci.getDescription()); ri.setDescription(ci.getDescription());
ri.setName(ci.getName()); ri.setName(ci.getName());
ri.setZenodo_community(Constants.ZENODO_COMMUNITY_PREFIX + ci.getZenodocommunity()); ri.setZenodo_community(Constants.ZENODO_COMMUNITY_PREFIX + ci.getZenodocommunity());
// log.info("created context: {}", new Gson().toJson(ri));
return (R) ri; return (R) ri;
} catch (final Exception e) { } catch (final Exception e) {

View File

@ -126,13 +126,6 @@ public class QueryInformationSystem {
} }
} }
// cat_iterator = el.elementIterator();
// while (cat_iterator.hasNext()) {
// Element catEl = (Element) cat_iterator.next();
// if (catEl.getName().equals("param") && catEl.attribute("name").getValue().equals("openaireId")) {
// datasourceList.add(catEl.getText());
// }
// }
return datasourceList; return datasourceList;
} }

View File

@ -1,4 +1,7 @@
/**
* Reads all the entities of the same type (Relation / Results) and saves them in the same folder
*
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -75,8 +78,9 @@ public class SparkCollectAndSave implements Serializable {
.union(Utils.readPath(spark, inputPath + "/relation/software", Relation.class)) .union(Utils.readPath(spark, inputPath + "/relation/software", Relation.class))
.union(Utils.readPath(spark, inputPath + "/relation/contextOrg", Relation.class)) .union(Utils.readPath(spark, inputPath + "/relation/contextOrg", Relation.class))
.union(Utils.readPath(spark, inputPath + "/relation/context", Relation.class)) .union(Utils.readPath(spark, inputPath + "/relation/context", Relation.class))
.union(Utils.readPath(spark, inputPath + "/relation/relation", Relation.class))
.write() .write()
.mode(SaveMode.Append) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "/relation"); .json(outputPath + "/relation");

View File

@ -1,17 +1,14 @@
/**
* Spark Job that fires the dump for the entites
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
import java.io.Serializable; import java.io.Serializable;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
public class SparkDumpEntitiesJob implements Serializable { public class SparkDumpEntitiesJob implements Serializable {
@ -44,18 +41,10 @@ public class SparkDumpEntitiesJob implements Serializable {
final String communityMapPath = parser.get("communityMapPath"); final String communityMapPath = parser.get("communityMapPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
Class<? extends OafEntity> inputClazz = (Class<? extends OafEntity>) Class.forName(resultClassName); Class<? extends OafEntity> inputClazz = (Class<? extends OafEntity>) Class.forName(resultClassName);
QueryInformationSystem queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl));
CommunityMap communityMap = queryInformationSystem.getCommunityMap();
DumpGraphEntities dg = new DumpGraphEntities(); DumpGraphEntities dg = new DumpGraphEntities();
dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMapPath); dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMapPath);
// dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap);
} }

View File

@ -1,4 +1,6 @@
/**
* Dumps eu.dnetlib.dhp.schema.oaf.Relation in eu.dnetlib.dhp.schema.dump.oaf.graph.Relation
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;

View File

@ -1,4 +1,6 @@
/**
* Spark job that fires the extraction of relations from entities
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
import java.io.Serializable; import java.io.Serializable;
@ -42,20 +44,12 @@ public class SparkExtractRelationFromEntities implements Serializable {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
// final String communityMapPath = parser.get("communityMapPath"); final String communityMapPath = parser.get("communityMapPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
QueryInformationSystem queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl));
CommunityMap communityMap = queryInformationSystem.getCommunityMap();
Extractor extractor = new Extractor(); Extractor extractor = new Extractor();
// extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMapPath); extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMapPath);
extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap);
} }

View File

@ -1,4 +1,7 @@
/**
* Create new Relations between Context Entities and Organizations whose products are associated to the context.
* It produces relation such as: organization <-> isRelatedTo <-> context
*/
package eu.dnetlib.dhp.oa.graph.dump.graph; package eu.dnetlib.dhp.oa.graph.dump.graph;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;