1
0
Fork 0

Merge branch 'master' into przemyslawjacewicz_actionmanager_impl_prototype

This commit is contained in:
Przemysław Jacewicz 2020-02-26 11:50:20 +01:00
commit 02db368dc5
25 changed files with 484 additions and 419 deletions

View File

@ -46,6 +46,18 @@
<groupId>net.sf.saxon</groupId> <groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId> <artifactId>Saxon-HE</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>cnr-rmi-api</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.graph.utils; package eu.dnetlib.dhp.utils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;

View File

@ -30,6 +30,12 @@
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
</dependency> </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>

View File

@ -1,66 +1,83 @@
package eu.dnetlib.dhp.schema.oaf; package eu.dnetlib.dhp.schema.oaf;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
public class Relation extends Oaf { public class Relation extends Oaf {
private String relType; private String relType;
private String subRelType; private String subRelType;
private String relClass; private String relClass;
private String source; private String source;
private String target; private String target;
private List<KeyValue> collectedFrom; private List<KeyValue> collectedFrom = new ArrayList<>();
public String getRelType() { public String getRelType() {
return relType; return relType;
} }
public void setRelType(String relType) { public void setRelType(final String relType) {
this.relType = relType; this.relType = relType;
} }
public String getSubRelType() { public String getSubRelType() {
return subRelType; return subRelType;
} }
public void setSubRelType(String subRelType) { public void setSubRelType(final String subRelType) {
this.subRelType = subRelType; this.subRelType = subRelType;
} }
public String getRelClass() { public String getRelClass() {
return relClass; return relClass;
} }
public void setRelClass(String relClass) { public void setRelClass(final String relClass) {
this.relClass = relClass; this.relClass = relClass;
} }
public String getSource() { public String getSource() {
return source; return source;
} }
public void setSource(String source) { public void setSource(final String source) {
this.source = source; this.source = source;
} }
public String getTarget() { public String getTarget() {
return target; return target;
} }
public void setTarget(String target) { public void setTarget(final String target) {
this.target = target; this.target = target;
} }
public List<KeyValue> getCollectedFrom() { public List<KeyValue> getCollectedFrom() {
return collectedFrom; return collectedFrom;
} }
public void setCollectedFrom(final List<KeyValue> collectedFrom) {
this.collectedFrom = collectedFrom;
}
public void mergeFrom(final Relation r) {
Assert.assertEquals("source ids must be equal", getSource(), r.getSource());
Assert.assertEquals("target ids must be equal", getTarget(), r.getTarget());
Assert.assertEquals("relType(s) must be equal", getRelType(), r.getRelType());
Assert.assertEquals("subRelType(s) must be equal", getSubRelType(), r.getSubRelType());
Assert.assertEquals("relClass(es) must be equal", getRelClass(), r.getRelClass());
setCollectedFrom(Stream.concat(getCollectedFrom().stream(), r.getCollectedFrom().stream())
.distinct() // relies on KeyValue.equals
.collect(Collectors.toList()));
}
public void setCollectedFrom(List<KeyValue> collectedFrom) {
this.collectedFrom = collectedFrom;
}
} }

View File

@ -227,7 +227,21 @@ public class AbstractMigrationExecutor implements Closeable {
final String nsPrefix = StringUtils.substringBefore(originalId, "::"); final String nsPrefix = StringUtils.substringBefore(originalId, "::");
final String rest = StringUtils.substringAfter(originalId, "::"); final String rest = StringUtils.substringAfter(originalId, "::");
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
}
public static String createOpenaireId(final String type, final String originalId) {
switch (type) {
case "datasource":
return createOpenaireId(10, originalId);
case "organization":
return createOpenaireId(20, originalId);
case "person":
return createOpenaireId(30, originalId);
case "project":
return createOpenaireId(40, originalId);
default:
return createOpenaireId(50, originalId);
}
} }
public static String asString(final Object o) { public static String asString(final Object o) {

View File

@ -398,6 +398,8 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor {
protected DataInfo prepareDataInfo(final Document doc) { protected DataInfo prepareDataInfo(final Document doc) {
final Node n = doc.selectSingleNode("//oaf:datainfo"); final Node n = doc.selectSingleNode("//oaf:datainfo");
if (n == null) { return null; }
final String paClassId = n.valueOf("./oaf:provenanceaction/@classid"); final String paClassId = n.valueOf("./oaf:provenanceaction/@classid");
final String paClassName = n.valueOf("./oaf:provenanceaction/@classname"); final String paClassName = n.valueOf("./oaf:provenanceaction/@classname");
final String paSchemeId = n.valueOf("./oaf:provenanceaction/@schemeid"); final String paSchemeId = n.valueOf("./oaf:provenanceaction/@schemeid");

View File

@ -1,56 +1,94 @@
package eu.dnetlib.dhp.migration; package eu.dnetlib.dhp.migration;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import java.util.List; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import scala.Tuple2;
public class ExtractEntitiesFromHDFSJob { public class ExtractEntitiesFromHDFSJob {
private static final Log log = LogFactory.getLog(ExtractEntitiesFromHDFSJob.class);
private static List<String> folderNames = Arrays.asList("db_entities", "oaf_entities", "odf_entities"); public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json")));
parser.parseArgument(args);
public static void main(String[] args) throws Exception { final SparkSession spark = SparkSession
final ArgumentApplicationParser parser = new ArgumentApplicationParser( .builder()
IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json"))); .appName(ExtractEntitiesFromHDFSJob.class.getSimpleName())
parser.parseArgument(args); .master(parser.get("master"))
.getOrCreate();
final SparkSession spark = SparkSession try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
.builder()
.appName(ExtractEntitiesFromHDFSJob.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final String sourcePath = parser.get("sourcePath"); final List<String> sourcePaths = Arrays.stream(parser.get("sourcePaths").split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
final String targetPath = parser.get("graphRawPath"); final String targetPath = parser.get("graphRawPath");
final String entity = parser.get("entity");
processEntity(sc, Publication.class, sourcePaths, targetPath);
processEntity(sc, Dataset.class, sourcePaths, targetPath);
processEntity(sc, Software.class, sourcePaths, targetPath);
processEntity(sc, OtherResearchProduct.class, sourcePaths, targetPath);
processEntity(sc, Datasource.class, sourcePaths, targetPath);
processEntity(sc, Organization.class, sourcePaths, targetPath);
processEntity(sc, Project.class, sourcePaths, targetPath);
processEntity(sc, Relation.class, sourcePaths, targetPath);
}
}
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final List<String> sourcePaths, final String targetPath) {
final String type = clazz.getSimpleName().toLowerCase();
log.info(String.format("Processing entities (%s) in files:", type));
sourcePaths.forEach(log::info);
JavaRDD<String> inputRdd = sc.emptyRDD(); JavaRDD<String> inputRdd = sc.emptyRDD();
for (final String sp : sourcePaths) {
inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.filter(k -> isEntityType(k._1(), type))
.map(Tuple2::_2));
}
folderNames.forEach(p -> inputRdd.union( inputRdd.saveAsTextFile(targetPath + "/" + type);
sc.sequenceFile(sourcePath+"/"+p, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.filter(k -> isEntityType(k._1(), entity))
.map(Tuple2::_2))
);
inputRdd.saveAsTextFile(targetPath+"/"+entity); }
}
private static boolean isEntityType(final String item, final String type) {
return StringUtils.substringAfter(item, ":").equalsIgnoreCase(type);
}
private static boolean isEntityType(final String item, final String entity) { private static boolean exists(final JavaSparkContext context, final String pathToFile) {
return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity); try {
} final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
final Path path = new Path(pathToFile);
return hdfs.exists(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
} }

View File

@ -17,15 +17,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable { public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable {
@ -53,22 +59,28 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
final String hdfsNameNode = parser.get("namenode"); final String hdfsNameNode = parser.get("namenode");
final String hdfsUser = parser.get("hdfsUser"); final String hdfsUser = parser.get("hdfsUser");
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) { try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) {
log.info("Processing datasources..."); if (processClaims) {
smdbe.execute("queryDatasources.sql", smdbe::processDatasource); log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims);
} else {
log.info("Processing datasources...");
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
log.info("Processing projects..."); log.info("Processing projects...");
smdbe.execute("queryProjects.sql", smdbe::processProject); smdbe.execute("queryProjects.sql", smdbe::processProject);
log.info("Processing orgs..."); log.info("Processing orgs...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
log.info("Processing relations ds <-> orgs ..."); log.info("Processing relations ds <-> orgs ...");
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
log.info("Processing projects <-> orgs ...");
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
log.info("Processing projects <-> orgs ...");
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
}
log.info("All done."); log.info("All done.");
} }
} }
@ -377,7 +389,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
r2.setTarget(dsId); r2.setTarget(dsId);
r2.setCollectedFrom(collectedFrom); r2.setCollectedFrom(collectedFrom);
r2.setDataInfo(info); r2.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp); r2.setLastupdatetimestamp(lastUpdateTimestamp);
emitOaf(r2); emitOaf(r2);
// rs.getString("datasource"); // rs.getString("datasource");
@ -426,7 +438,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
r2.setTarget(projectId); r2.setTarget(projectId);
r2.setCollectedFrom(collectedFrom); r2.setCollectedFrom(collectedFrom);
r2.setDataInfo(info); r2.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp); r2.setLastupdatetimestamp(lastUpdateTimestamp);
emitOaf(r2); emitOaf(r2);
// rs.getString("project"); // rs.getString("project");
@ -450,6 +462,81 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
} }
} }
public void processClaims(final ResultSet rs) {
final DataInfo info =
dataInfo(false, null, false, false, qualifier("user:claim", "user:claim", "dnet:provenanceActions", "dnet:provenanceActions"), "0.9");
try {
if (rs.getString("source_type").equals("context")) {
final Result r;
if (rs.getString("target_type").equals("dataset")) {
r = new Dataset();
} else if (rs.getString("target_type").equals("software")) {
r = new Software();
} else if (rs.getString("target_type").equals("other")) {
r = new OtherResearchProduct();
} else {
r = new Publication();
}
r.setId(createOpenaireId(50, rs.getString("target_id")));
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setContext(prepareContext(rs.getString("source_id"), info));
r.setDataInfo(info);
emitOaf(r);
} else {
final String sourceId = createOpenaireId(rs.getString("source_type"), rs.getString("source_id"));
final String targetId = createOpenaireId(rs.getString("target_type"), rs.getString("target_id"));
final Relation r1 = new Relation();
final Relation r2 = new Relation();
if (rs.getString("source_type").equals("project")) {
r1.setRelType("resultProject");
r1.setSubRelType("outcome");
r1.setRelClass("produces");
r2.setRelType("resultProject");
r2.setSubRelType("outcome");
r2.setRelClass("isProducedBy");
} else {
r1.setRelType("resultResult");
r1.setSubRelType("relationship");
r1.setRelClass("isRelatedTo");
r2.setRelType("resultResult");
r2.setSubRelType("relationship");
r2.setRelClass("isRelatedTo");
}
r1.setSource(sourceId);
r1.setTarget(targetId);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
emitOaf(r1);
r2.setSource(targetId);
r2.setTarget(sourceId);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
emitOaf(r2);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private List<Context> prepareContext(final String id, final DataInfo dataInfo) {
final Context context = new Context();
context.setId(id);
context.setDataInfo(Arrays.asList(dataInfo));
return Arrays.asList(context);
}
private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException { private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException {
final Boolean deletedbyinference = rs.getBoolean("deletedbyinference"); final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
final String inferenceprovenance = rs.getString("inferenceprovenance"); final String inferenceprovenance = rs.getString("inferenceprovenance");

View File

@ -1,8 +1,8 @@
[ [
{ {
"paramName": "s", "paramName": "s",
"paramLongName": "sourcePath", "paramLongName": "sourcePaths",
"paramDescription": "the HDFS source path which contains the sequential file", "paramDescription": "the HDFS source paths which contains the sequential file (comma separated)",
"paramRequired": true "paramRequired": true
}, },
{ {
@ -16,11 +16,5 @@
"paramLongName": "graphRawPath", "paramLongName": "graphRawPath",
"paramDescription": "the path of the graph Raw in hdfs", "paramDescription": "the path of the graph Raw in hdfs",
"paramRequired": true "paramRequired": true
},
{
"paramName": "e",
"paramLongName": "entity",
"paramDescription": "The entity to extract",
"paramRequired": true
} }
] ]

View File

@ -34,5 +34,11 @@
"paramLongName": "postgresPassword", "paramLongName": "postgresPassword",
"paramDescription": "postgres password", "paramDescription": "postgres password",
"paramRequired": false "paramRequired": false
},
{
"paramName": "a",
"paramLongName": "action",
"paramDescription": "process claims",
"paramRequired": false
} }
] ]

View File

@ -43,8 +43,7 @@
</property> </property>
</parameters> </parameters>
<start to="ResetWorkingPath"/> <start to="ResetWorkingPath"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -71,7 +70,24 @@
<arg>-dbuser</arg><arg>${postgresUser}</arg> <arg>-dbuser</arg><arg>${postgresUser}</arg>
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg> <arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
</java> </java>
<ok to="ImportODFEntitiesFromMongoDB"/> <ok to="ImportClaimsFromPostgres"/>
<error to="Kill"/>
</action>
<action name="ImportClaimsFromPostgres">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${workingPath}/db_claims</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-dburl</arg><arg>${postgresURL}</arg>
<arg>-dbuser</arg><arg>${postgresUser}</arg>
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
<arg>-a</arg><arg>claims</arg>
</java>
<ok to="ImportODFEntitiesFromMongoDB"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -113,170 +129,78 @@
<arg>-pguser</arg><arg>${postgresUser}</arg> <arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg> <arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java> </java>
<ok to="End"/> <ok to="ImportODFClaimsFromMongoDB"/>
<error to="Kill"/>
</action>
<action name="ImportODFClaimsFromMongoDB">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingPath}/odf_claims</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongourl}</arg>
<arg>-db</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>claim</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java>
<ok to="ImportOAFClaimsFromMongoDB"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="ImportOAFClaimsFromMongoDB">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingPath}/oaf_claims</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-u</arg><arg>${hdfsUser}</arg>
<arg>-mongourl</arg><arg>${mongourl}</arg>
<arg>-db</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>claim</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java>
<ok to="ResetGraphRawPath"/>
<error to="Kill"/>
</action>
<action name="ExtractPublication"> <action name="ResetGraphRawPath">
<fs>
<delete path='${graphRawPath}'/>
<mkdir path='${graphRawPath}'/>
</fs>
<ok to="ExtractEntitiesInGraphRawPath"/>
<error to="Kill"/>
</action>
<action name="ExtractEntitiesInGraphRawPath">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>ExtractEntities: publication</name> <name>ExtractEntities</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class> <class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts> <spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg> <arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg> <arg>-s</arg><arg>${workingPath}/db_entities,${workingPath}/oaf_entities,${workingPath}/odf_entities</arg>
<arg>-g</arg><arg>${graphRawPath}/publication</arg> <arg>-g</arg><arg>${graphRawPath}</arg>
<arg>-e</arg><arg>publication</arg>
</spark>
<ok to="ExtractDataset"/>
<error to="Kill"/>
</action>
<action name="ExtractDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: dataset</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/dataset</arg>
<arg>-e</arg><arg>dataset</arg>
</spark>
<ok to="ExtractSoftware"/>
<error to="Kill"/>
</action>
<action name="ExtractSoftware">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: software</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/software</arg>
<arg>-e</arg><arg>software</arg>
</spark>
<ok to="ExtractORP"/>
<error to="Kill"/>
</action>
<action name="ExtractORP">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: otherresearchproduct</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/otherresearchproduct</arg>
<arg>-e</arg><arg>otherresearchproduct</arg>
</spark>
<ok to="ExtractDatasource"/>
<error to="Kill"/>
</action>
<action name="ExtractDatasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: datasource</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/datasource</arg>
<arg>-e</arg><arg>datasource</arg>
</spark>
<ok to="ExtractOrganization"/>
<error to="Kill"/>
</action>
<action name="ExtractOrganization">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: organization</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/organization</arg>
<arg>-e</arg><arg>organization</arg>
</spark>
<ok to="ExtractProject"/>
<error to="Kill"/>
</action>
<action name="ExtractProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: project</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/project</arg>
<arg>-e</arg><arg>project</arg>
</spark>
<ok to="ExtractRelation"/>
<error to="Kill"/>
</action>
<action name="ExtractRelation">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractEntities: relation</name>
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>-g</arg><arg>${graphRawPath}/relation</arg>
<arg>-e</arg><arg>relation</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -0,0 +1 @@
SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE;

View File

@ -28,15 +28,17 @@ SELECT
p.summary AS summary, p.summary AS summary,
p.currency AS currency, p.currency AS currency,
p.totalcost AS totalcost, p.totalcost AS totalcost,
p.fundedamount AS fundedamount, p.fundedamount AS fundedamount,
dc.id AS collectedfromid, dc.id AS collectedfromid,
dc.officialname AS collectedfromname, dc.officialname AS collectedfromname,
ctc.code || '@@@' || ctc.name || '@@@' || cts.code || '@@@' || cts.name AS contracttype, p.contracttype || '@@@' || p.contracttypename || '@@@' || p.contracttypescheme || '@@@' || p.contracttypescheme AS contracttype,
pac.code || '@@@' || pac.name || '@@@' || pas.code || '@@@' || pas.name AS provenanceaction, pac.code || '@@@' || pac.name || '@@@' || pas.code || '@@@' || pas.name AS provenanceaction,
array_agg(DISTINCT i.pid || '###' || i.issuertype) AS pid, array_agg(DISTINCT i.pid || '###' || i.issuertype) AS pid,
array_agg(DISTINCT s.name || '###' || sc.code || '@@@' || sc.name || '@@@' || ss.code || '@@@' || ss.name) AS subjects, array_agg(DISTINCT s.name || '###' || sc.code || '@@@' || sc.name || '@@@' || ss.code || '@@@' || ss.name) AS subjects,
array_agg(DISTINCT fp.path) AS fundingtree array_agg(DISTINCT fp.path) AS fundingtree
FROM projects p FROM projects p
LEFT OUTER JOIN class pac ON (pac.code = p.provenanceactionclass) LEFT OUTER JOIN class pac ON (pac.code = p.provenanceactionclass)
LEFT OUTER JOIN scheme pas ON (pas.code = p.provenanceactionscheme) LEFT OUTER JOIN scheme pas ON (pas.code = p.provenanceactionscheme)
@ -54,9 +56,6 @@ SELECT
LEFT OUTER JOIN class sc ON (sc.code = s.semanticclass) LEFT OUTER JOIN class sc ON (sc.code = s.semanticclass)
LEFT OUTER JOIN scheme ss ON (ss.code = s.semanticscheme) LEFT OUTER JOIN scheme ss ON (ss.code = s.semanticscheme)
LEFT OUTER JOIN class ctc ON (ctc.code = p.contracttypeclass)
LEFT OUTER JOIN scheme cts ON (cts.code = p.contracttypescheme)
GROUP BY GROUP BY
p.id, p.id,
p.code, p.code,
@ -87,4 +86,4 @@ SELECT
dc.id, dc.id,
dc.officialname, dc.officialname,
pac.code, pac.name, pas.code, pas.name, pac.code, pac.name, pas.code, pas.name,
ctc.code, ctc.name, cts.code, cts.name; p.contracttype , p.contracttypename, p.contracttypescheme;

View File

@ -0,0 +1,90 @@
SELECT
p.id AS projectid,
p.code AS code,
p.websiteurl AS websiteurl,
p.acronym AS acronym,
p.title AS title,
p.startdate AS startdate,
p.enddate AS enddate,
p.call_identifier AS callidentifier,
p.keywords AS keywords,
p.duration AS duration,
p.ec_sc39 AS ecsc39,
p.oa_mandate_for_publications AS oamandatepublications,
p.ec_article29_3 AS ecarticle29_3,
p.dateofcollection AS dateofcollection,
p.lastupdate AS dateoftransformation,
p.inferred AS inferred,
p.deletedbyinference AS deletedbyinference,
p.trust AS trust,
p.inferenceprovenance AS inferenceprovenance,
p.optional1 AS optional1,
p.optional2 AS optional2,
p.jsonextrainfo AS jsonextrainfo,
p.contactfullname AS contactfullname,
p.contactfax AS contactfax,
p.contactphone AS contactphone,
p.contactemail AS contactemail,
p.summary AS summary,
p.currency AS currency,
p.totalcost AS totalcost,
p.fundedamount AS fundedamount,
dc.id AS collectedfromid,
dc.officialname AS collectedfromname,
ctc.code || '@@@' || ctc.name || '@@@' || cts.code || '@@@' || cts.name AS contracttype,
pac.code || '@@@' || pac.name || '@@@' || pas.code || '@@@' || pas.name AS provenanceaction,
array_agg(DISTINCT i.pid || '###' || i.issuertype) AS pid,
array_agg(DISTINCT s.name || '###' || sc.code || '@@@' || sc.name || '@@@' || ss.code || '@@@' || ss.name) AS subjects,
array_agg(DISTINCT fp.path) AS fundingtree
FROM projects p
LEFT OUTER JOIN class pac ON (pac.code = p.provenanceactionclass)
LEFT OUTER JOIN scheme pas ON (pas.code = p.provenanceactionscheme)
LEFT OUTER JOIN projectpids pp ON (pp.project = p.id)
LEFT OUTER JOIN dsm_identities i ON (i.pid = pp.pid)
LEFT OUTER JOIN dsm_datasources dc ON (dc.id = p.collectedfrom)
LEFT OUTER JOIN project_fundingpath pf ON (pf.project = p.id)
LEFT OUTER JOIN fundingpaths fp ON (fp.id = pf.funding)
LEFT OUTER JOIN project_subject ps ON (ps.project = p.id)
LEFT OUTER JOIN subjects s ON (s.id = ps.subject)
LEFT OUTER JOIN class sc ON (sc.code = s.semanticclass)
LEFT OUTER JOIN scheme ss ON (ss.code = s.semanticscheme)
LEFT OUTER JOIN class ctc ON (ctc.code = p.contracttypeclass)
LEFT OUTER JOIN scheme cts ON (cts.code = p.contracttypescheme)
GROUP BY
p.id,
p.code,
p.websiteurl,
p.acronym,
p.title,
p.startdate,
p.enddate,
p.call_identifier,
p.keywords,
p.duration,
p.ec_sc39,
p.oa_mandate_for_publications,
p.ec_article29_3,
p.dateofcollection,
p.inferred,
p.deletedbyinference,
p.trust,
p.inferenceprovenance,
p.contactfullname,
p.contactfax,
p.contactphone,
p.contactemail,
p.summary,
p.currency,
p.totalcost,
p.fundedamount,
dc.id,
dc.officialname,
pac.code, pac.name, pas.code, pas.name,
ctc.code, ctc.name, cts.code, cts.name;

View File

@ -66,14 +66,6 @@
<artifactId>zookeeper</artifactId> <artifactId>zookeeper</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>cnr-rmi-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>

View File

@ -2,8 +2,8 @@ package eu.dnetlib.dhp.graph;
import com.lucidworks.spark.util.SolrSupport; import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.utils.ISLookupClientFactory;
import eu.dnetlib.dhp.graph.utils.StreamingInputDocumentFactory; import eu.dnetlib.dhp.graph.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.graph.utils; package eu.dnetlib.dhp.graph.utils;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.dom4j.Document; import org.dom4j.Document;

View File

@ -43,8 +43,6 @@ public class XmlRecordFactory implements Serializable {
private String schemaLocation; private String schemaLocation;
private Set<String> contextes = Sets.newHashSet();
private boolean indent = false; private boolean indent = false;
public XmlRecordFactory( public XmlRecordFactory(
@ -59,15 +57,18 @@ public class XmlRecordFactory implements Serializable {
} }
public String build(final JoinedEntity je) { public String build(final JoinedEntity je) {
final Set<String> contexts = Sets.newHashSet();
final OafEntity entity = je.getEntity(); final OafEntity entity = je.getEntity();
TemplateFactory templateFactory = new TemplateFactory(); TemplateFactory templateFactory = new TemplateFactory();
try { try {
final List<String> metadata = metadata(je.getType(), entity); final List<String> metadata = metadata(je.getType(), entity, contexts);
// rels has to be processed before the contexts because they enrich the contextMap with the funding info. // rels has to be processed before the contexts because they enrich the contextMap with the funding info.
final List<String> relations = listRelations(je, templateFactory); final List<String> relations = listRelations(je, templateFactory, contexts);
metadata.addAll(buildContexts(getMainType(je.getType()))); metadata.addAll(buildContexts(getMainType(je.getType()), contexts));
metadata.add(parseDataInfo(entity.getDataInfo())); metadata.add(parseDataInfo(entity.getDataInfo()));
final String body = templateFactory.buildBody( final String body = templateFactory.buildBody(
@ -97,10 +98,11 @@ public class XmlRecordFactory implements Serializable {
} }
} }
private List<String> metadata(final String type, final OafEntity entity) { private List<String> metadata(final String type, final OafEntity entity, final Set<String> contexts) {
final List<String> metadata = Lists.newArrayList(); final List<String> metadata = Lists.newArrayList();
if (entity.getCollectedfrom() != null) { if (entity.getCollectedfrom() != null) {
metadata.addAll(entity.getCollectedfrom() metadata.addAll(entity.getCollectedfrom()
.stream() .stream()
@ -123,6 +125,17 @@ public class XmlRecordFactory implements Serializable {
if (GraphMappingUtils.isResult(type)) { if (GraphMappingUtils.isResult(type)) {
final Result r = (Result) entity; final Result r = (Result) entity;
if (r.getContext() != null) {
contexts.addAll(r.getContext()
.stream()
.map(c -> c.getId())
.collect(Collectors.toList()));
/* FIXME: Workaround for CLARIN mining issue: #3670#note-29 */
if (contexts.contains("dh-ch::subcommunity::2")) {
contexts.add("clarin");
}
}
if (r.getTitle() != null) { if (r.getTitle() != null) {
metadata.addAll(r.getTitle() metadata.addAll(r.getTitle()
.stream() .stream()
@ -235,16 +248,6 @@ public class XmlRecordFactory implements Serializable {
} }
metadata.add(mapQualifier("bestaccessright", getBestAccessright(r))); metadata.add(mapQualifier("bestaccessright", getBestAccessright(r)));
if (r.getContext() != null) {
contextes.addAll(r.getContext()
.stream()
.map(c -> c.getId())
.collect(Collectors.toList()));
if (contextes.contains("dh-ch::subcommunity::2")) {
contextes.add("clarin");
}
}
} }
switch (EntityType.valueOf(type)) { switch (EntityType.valueOf(type)) {
@ -618,7 +621,7 @@ public class XmlRecordFactory implements Serializable {
return bestAccessRight; return bestAccessRight;
} }
private List<String> listRelations(final JoinedEntity je, TemplateFactory templateFactory) { private List<String> listRelations(final JoinedEntity je, TemplateFactory templateFactory, final Set<String> contexts) {
final List<String> rels = Lists.newArrayList(); final List<String> rels = Lists.newArrayList();
for (final Tuple2 link : je.getLinks()) { for (final Tuple2 link : je.getLinks()) {
@ -699,7 +702,7 @@ public class XmlRecordFactory implements Serializable {
if (re.getFundingtree() != null) { if (re.getFundingtree() != null) {
metadata.addAll(re.getFundingtree() metadata.addAll(re.getFundingtree()
.stream() .stream()
.peek(ft -> fillContextMap(ft)) .peek(ft -> fillContextMap(ft, contexts))
.map(ft -> getRelFundingTree(ft)) .map(ft -> getRelFundingTree(ft))
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
@ -807,14 +810,14 @@ public class XmlRecordFactory implements Serializable {
.collect(Collectors.toList()) : Lists.newArrayList(); .collect(Collectors.toList()) : Lists.newArrayList();
} }
private List<String> buildContexts(final String type) { private List<String> buildContexts(final String type, final Set<String> contexts) {
final List<String> res = Lists.newArrayList(); final List<String> res = Lists.newArrayList();
if ((contextMapper != null) && !contextMapper.isEmpty() && MainEntityType.result.toString().equals(type)) { if ((contextMapper != null) && !contextMapper.isEmpty() && MainEntityType.result.toString().equals(type)) {
XMLTag document = XMLDoc.newDocument(true).addRoot("contextRoot"); XMLTag document = XMLDoc.newDocument(true).addRoot("contextRoot");
for (final String context : contextes) { for (final String context : contexts) {
String id = ""; String id = "";
for (final String token : Splitter.on("::").split(context)) { for (final String token : Splitter.on("::").split(context)) {
@ -882,7 +885,7 @@ public class XmlRecordFactory implements Serializable {
return buffer.toString(); return buffer.toString();
} }
private void fillContextMap(final String xmlTree) { private void fillContextMap(final String xmlTree, final Set<String> contexts) {
Document fundingPath; Document fundingPath;
try { try {
@ -896,7 +899,7 @@ public class XmlRecordFactory implements Serializable {
if (funder != null) { if (funder != null) {
final String funderShortName = funder.valueOf("./shortname"); final String funderShortName = funder.valueOf("./shortname");
contextes.add(funderShortName); contexts.add(funderShortName);
contextMapper.put(funderShortName, new ContextDef(funderShortName, funder.valueOf("./name"), "context", "funding")); contextMapper.put(funderShortName, new ContextDef(funderShortName, funder.valueOf("./name"), "context", "funding"));
final Node level0 = fundingPath.selectSingleNode("//funding_level_0"); final Node level0 = fundingPath.selectSingleNode("//funding_level_0");
@ -905,17 +908,17 @@ public class XmlRecordFactory implements Serializable {
contextMapper.put(level0Id, new ContextDef(level0Id, level0.valueOf("./description"), "category", "")); contextMapper.put(level0Id, new ContextDef(level0Id, level0.valueOf("./description"), "category", ""));
final Node level1 = fundingPath.selectSingleNode("//funding_level_1"); final Node level1 = fundingPath.selectSingleNode("//funding_level_1");
if (level1 == null) { if (level1 == null) {
contextes.add(level0Id); contexts.add(level0Id);
} else { } else {
final String level1Id = Joiner.on("::").join(level0Id, level1.valueOf("./name")); final String level1Id = Joiner.on("::").join(level0Id, level1.valueOf("./name"));
contextMapper.put(level1Id, new ContextDef(level1Id, level1.valueOf("./description"), "concept", "")); contextMapper.put(level1Id, new ContextDef(level1Id, level1.valueOf("./description"), "concept", ""));
final Node level2 = fundingPath.selectSingleNode("//funding_level_2"); final Node level2 = fundingPath.selectSingleNode("//funding_level_2");
if (level2 == null) { if (level2 == null) {
contextes.add(level1Id); contexts.add(level1Id);
} else { } else {
final String level2Id = Joiner.on("::").join(level1Id, level2.valueOf("./name")); final String level2Id = Joiner.on("::").join(level1Id, level2.valueOf("./name"));
contextMapper.put(level2Id, new ContextDef(level2Id, level2.valueOf("./description"), "concept", "")); contextMapper.put(level2Id, new ContextDef(level2Id, level2.valueOf("./description"), "concept", ""));
contextes.add(level2Id); contexts.add(level2Id);
} }
} }
} }

View File

@ -1,66 +0,0 @@
package eu.dnetlib.dhp.graph;
import eu.dnetlib.dhp.graph.model.EntityRelEntity;
import eu.dnetlib.dhp.graph.model.RelatedEntity;
import eu.dnetlib.dhp.graph.utils.GraphMappingUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStreamReader;
public class MappingUtilsTest {
private GraphMappingUtils utils;
@Before
public void setUp() {
utils = new GraphMappingUtils();
}
@Test
public void testOafMappingDatasource() throws IOException {
final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("datasource.json"));
final EntityRelEntity e = new ObjectMapper().readValue(in, EntityRelEntity.class);
e.getSource().setType("datasource");
final EntityRelEntity out = utils.asRelatedEntity(e);
System.out.println(out);
}
//@Test
public void testOafMappingResult() throws IOException {
final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("result.json"));
final EntityRelEntity e = new ObjectMapper().readValue(in, EntityRelEntity.class);
final EntityRelEntity out = utils.asRelatedEntity(e);
System.out.println(out);
}
@Test
public void testOafMappingSoftware() throws IOException {
final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("software.json"));
final EntityRelEntity e = new ObjectMapper().readValue(in, EntityRelEntity.class);
final EntityRelEntity out = utils.asRelatedEntity(e);
System.out.println(out);
}
@Test
public void testParseRelatedEntity() throws IOException {
final InputStreamReader in = new InputStreamReader(getClass().getResourceAsStream("related_entity.json"));
final RelatedEntity e = new ObjectMapper().readValue(in, RelatedEntity.class);
System.out.println(e);
}
}

View File

@ -1,55 +0,0 @@
package eu.dnetlib.dhp.graph;
import eu.dnetlib.dhp.graph.utils.ContextMapper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class XmlRecordFactoryTest {
private static final Log log = LogFactory.getLog(XmlRecordFactoryTest.class);
private Path testDir;
@Before
public void setup() throws IOException {
testDir = Files.createTempDirectory(getClass().getSimpleName());
log.info("created test directory " + testDir.toString());
}
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(testDir.toFile());
log.info("deleted test directory " + testDir.toString());
}
@Test
public void testXmlSerialization() throws Exception {
final SparkSession spark = SparkSession
.builder()
.appName(SparkXmlRecordBuilderJob.class.getSimpleName())
.master("local[*]")
.getOrCreate();
final String inputDir = testDir.toString() + "/3_joined_entities";
FileUtils.forceMkdir(new File(inputDir));
FileUtils.copyFile(new File("/Users/claudio/Downloads/joined_entities-part-00000"), new File(inputDir + "/joined_entities-part-00000"));
final ContextMapper ctx = ContextMapper.fromIS("https://dev-openaire.d4science.org:443/is/services/isLookUp");
final GraphJoiner g = new GraphJoiner(spark, ctx, inputDir, testDir.toString());
g.asXML();
}
}

View File

@ -1,5 +0,0 @@
{
"id": "20|nih_________::6b8108b6d6399f7163a6a7ccdd0efc2d",
"type": "organization",
"legalname": "MCGILL UNIVERSITY"
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

10
pom.xml
View File

@ -76,7 +76,7 @@
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<version>4.12</version> <version>${junit.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
@ -129,6 +129,13 @@
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
@ -474,6 +481,7 @@
<dhp.jackson.version>2.9.6</dhp.jackson.version> <dhp.jackson.version>2.9.6</dhp.jackson.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version> <dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<scala.version>2.11.12</scala.version> <scala.version>2.11.12</scala.version>
<junit.version>4.12</junit.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
</properties> </properties>
</project> </project>