This commit is contained in:
parent
2e0999a1df
commit
e87b790a60
|
@ -82,9 +82,9 @@ public class SendToZenodoHDFS implements Serializable {
|
|||
if (!pString.endsWith("_SUCCESS")) {
|
||||
String name = pString.substring(pString.lastIndexOf("/") + 1);
|
||||
|
||||
FSDataInputStream inputStream = fileSystem.open(p);
|
||||
zenodoApiClient.uploadIS(inputStream, name);
|
||||
|
||||
try (FSDataInputStream inputStream = fileSystem.open(p)) {
|
||||
zenodoApiClient.uploadIS(inputStream, name);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
package eu.dnetlib.dhp.oa.graph.dump.csv;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static org.apache.commons.lang3.StringUtils.remove;
|
||||
import static org.apache.commons.lang3.StringUtils.split;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
@ -87,6 +88,7 @@ public class SparkDumpResults implements Serializable {
|
|||
Class<R> inputClazz, String resultType, String workingPath) {
|
||||
|
||||
Dataset<String> resultIds = spark.read().textFile(workingPath + "/resultIds");
|
||||
// resultIds.foreach((ForeachFunction<String>) r -> System.out.println(r));
|
||||
Dataset<R> results = Utils
|
||||
.readPath(spark, inputPath + "/" + resultType, inputClazz)
|
||||
.filter(
|
||||
|
@ -108,8 +110,6 @@ public class SparkDumpResults implements Serializable {
|
|||
Encoders.bean(CSVResult.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
// .option("header", "true")
|
||||
// .option("delimiter", Constants.SEP)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result");
|
||||
|
||||
|
@ -125,8 +125,6 @@ public class SparkDumpResults implements Serializable {
|
|||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
// .option("header", "true")
|
||||
// .option("delimiter", Constants.SEP)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result_pid");
|
||||
|
||||
|
@ -186,8 +184,6 @@ public class SparkDumpResults implements Serializable {
|
|||
Encoders.bean(CSVRelResAut.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
// .option("header", "true")
|
||||
// .option("delimiter", Constants.SEP)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/result_author");
|
||||
|
||||
|
@ -199,8 +195,6 @@ public class SparkDumpResults implements Serializable {
|
|||
Encoders.bean(CSVAuthor.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
// .option("header", "true")
|
||||
// .option("delimiter", Constants.SEP)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "/" + resultType + "/author");
|
||||
|
||||
|
@ -264,7 +258,7 @@ public class SparkDumpResults implements Serializable {
|
|||
private static String getFieldValue(Field<String> input) {
|
||||
if (input != null &&
|
||||
StringUtils.isNotEmpty(input.getValue())) {
|
||||
return input.getValue();
|
||||
return removeBreaks(input.getValue());
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
|
@ -283,7 +277,7 @@ public class SparkDumpResults implements Serializable {
|
|||
if (Optional.ofNullable(r.getSubject()).isPresent())
|
||||
ret.setKeywords(String.join(", ", r.getSubject().stream().map(s -> {
|
||||
if (StringUtils.isNotEmpty(s.getValue()))
|
||||
return s.getValue().toLowerCase();
|
||||
return removeBreaks(s.getValue().toLowerCase());
|
||||
else
|
||||
return null;
|
||||
}).filter(Objects::nonNull).distinct().collect(Collectors.toList())));
|
||||
|
@ -311,7 +305,7 @@ public class SparkDumpResults implements Serializable {
|
|||
return "";
|
||||
for (Field<String> abs : description) {
|
||||
if (StringUtils.isNotEmpty(abs.getValue())) {
|
||||
return abs.getValue();
|
||||
return removeBreaks(abs.getValue());
|
||||
}
|
||||
}
|
||||
return "";
|
||||
|
@ -322,14 +316,22 @@ public class SparkDumpResults implements Serializable {
|
|||
for (StructuredProperty title : titles) {
|
||||
if (StringUtils.isEmpty(firstTitle)) {
|
||||
if (StringUtils.isNotEmpty(title.getValue()))
|
||||
firstTitle = title.getValue();
|
||||
firstTitle = removeBreaks(title.getValue());
|
||||
}
|
||||
if (title.getQualifier().getClassid().equals(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) {
|
||||
if (StringUtils.isNotEmpty(title.getValue()))
|
||||
return title.getValue();
|
||||
return removeBreaks(title.getValue());
|
||||
}
|
||||
}
|
||||
if (firstTitle != null) {
|
||||
return removeBreaks(firstTitle);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private static String removeBreaks(String input) {
|
||||
return input.replace("\n", " ").replace("\t", " ").replace("\r", " ");
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,241 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.dump.serafeim;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.Constants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 04/05/23
|
||||
*/
|
||||
//STEP 2
|
||||
public class SparkSelectResultsAndDumpRelations implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkSelectResultsAndDumpRelations.class);
|
||||
private static String RESULT_COMMUNITY_TABLE = "/result_community";
|
||||
private static String COMMUNITY_RESULT_IDS = "/communityResultIds";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkSelectResultsAndDumpRelations.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste2.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("sourcePath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
|
||||
List<String> communityList = null;
|
||||
Optional<String> communities = Optional.ofNullable(parser.get("communities"));
|
||||
if (communities.isPresent()) {
|
||||
communityList = Arrays.asList(communities.get().split(";"));
|
||||
}
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
List<String> finalCommunityList = communityList;
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
run(spark, inputPath, outputPath, workingPath, finalCommunityList);
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static void run(SparkSession spark, String inputPath, String outputPath,
|
||||
String workingPath,
|
||||
List<String> communityList) {
|
||||
|
||||
// select the result ids related to the set of communities considered
|
||||
writeCommunityRelatedIds(
|
||||
spark, inputPath, Publication.class, communityList, workingPath, "publication");
|
||||
writeCommunityRelatedIds(
|
||||
spark, inputPath, Dataset.class, communityList, workingPath, "dataset");
|
||||
writeCommunityRelatedIds(
|
||||
spark, inputPath, Software.class, communityList, workingPath, "software");
|
||||
writeCommunityRelatedIds(
|
||||
spark, inputPath, OtherResearchProduct.class, communityList,
|
||||
workingPath, "otherresearchproduct");
|
||||
|
||||
// select the relations with semantics cites
|
||||
org.apache.spark.sql.Dataset<Relation> relations = Utils
|
||||
.readPath(spark, inputPath + "/relation", Relation.class)
|
||||
.filter(
|
||||
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
||||
r.getRelClass().equals(ModelConstants.CITES));
|
||||
|
||||
// select the relations having as source one of the results related to the
|
||||
// communities
|
||||
org.apache.spark.sql.Dataset<String> communityResultIds = spark
|
||||
.read()
|
||||
.textFile(workingPath + COMMUNITY_RESULT_IDS)
|
||||
.distinct();
|
||||
|
||||
Utils
|
||||
.readPath(spark, inputPath + "/publication", Publication.class)
|
||||
.filter(
|
||||
(FilterFunction<Publication>) p -> !p.getDataInfo().getDeletedbyinference()
|
||||
&& !p.getDataInfo().getInvisible())
|
||||
.map((MapFunction<Publication, String>) p -> p.getId(), Encoders.STRING())
|
||||
.union(
|
||||
Utils
|
||||
.readPath(spark, inputPath + "/dataset", Dataset.class)
|
||||
.filter(
|
||||
(FilterFunction<Dataset>) p -> !p.getDataInfo().getDeletedbyinference()
|
||||
&& !p.getDataInfo().getInvisible())
|
||||
.map((MapFunction<Dataset, String>) p -> p.getId(), Encoders.STRING()))
|
||||
.union(
|
||||
Utils
|
||||
.readPath(spark, inputPath + "/software", Software.class)
|
||||
.filter(
|
||||
(FilterFunction<Software>) p -> !p.getDataInfo().getDeletedbyinference()
|
||||
&& !p.getDataInfo().getInvisible())
|
||||
.map((MapFunction<Software, String>) p -> p.getId(), Encoders.STRING()))
|
||||
.union(
|
||||
Utils
|
||||
.readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
|
||||
.filter(
|
||||
(FilterFunction<OtherResearchProduct>) p -> !p.getDataInfo().getDeletedbyinference()
|
||||
&& !p.getDataInfo().getInvisible())
|
||||
.map((MapFunction<OtherResearchProduct, String>) p -> p.getId(), Encoders.STRING()))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.text(workingPath + "/resultIds");
|
||||
|
||||
org.apache.spark.sql.Dataset<String> resultIds = spark.read().textFile(workingPath + "/resultIds");
|
||||
|
||||
org.apache.spark.sql.Dataset<Relation> oksource = communityResultIds
|
||||
.joinWith(relations, communityResultIds.col("value").equalTo(relations.col("source")))
|
||||
.map(
|
||||
(MapFunction<Tuple2<String, Relation>, Relation>) t2 -> t2._2(),
|
||||
Encoders.bean(Relation.class));
|
||||
oksource
|
||||
.joinWith(resultIds, oksource.col("target").equalTo(resultIds.col("value")))
|
||||
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath + "/relation");
|
||||
|
||||
writeNodes(
|
||||
spark, inputPath + "/publication", Publication.class, outputPath + "/publication",
|
||||
outputPath + "/relation", workingPath);
|
||||
writeNodes(
|
||||
spark, inputPath + "/dataset", Dataset.class, outputPath + "/dataset", outputPath + "/relation",
|
||||
workingPath);
|
||||
writeNodes(
|
||||
spark, inputPath + "/software", Software.class, outputPath + "/software", outputPath + "/relation",
|
||||
workingPath);
|
||||
writeNodes(
|
||||
spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class,
|
||||
outputPath + "/otherresearchproduct", outputPath + "/relation", workingPath);
|
||||
|
||||
}
|
||||
|
||||
private static <R extends Result> void writeNodes(SparkSession spark, String inputPath, Class<R> clazz,
|
||||
String outputPath, String relationPath, String workingPath) {
|
||||
org.apache.spark.sql.Dataset<Relation> citingRelations = Utils.readPath(spark, relationPath, Relation.class);
|
||||
org.apache.spark.sql.Dataset<R> result = Utils
|
||||
.readPath(spark, inputPath, clazz)
|
||||
.filter(
|
||||
(FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() &&
|
||||
!p.getDataInfo().getInvisible());
|
||||
|
||||
// take the distinct result id for source and target of the relations
|
||||
citingRelations
|
||||
.flatMap(
|
||||
(FlatMapFunction<Relation, String>) r -> Arrays
|
||||
.asList(r.getSource(), r.getTarget())
|
||||
.iterator(),
|
||||
Encoders.STRING())
|
||||
.distinct()
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.text(workingPath + "/relationIds");
|
||||
|
||||
org.apache.spark.sql.Dataset<String> relationIds = spark.read().textFile(workingPath + "/relationIds");
|
||||
|
||||
relationIds
|
||||
.joinWith(result, relationIds.col("value").equalTo(result.col("id")))
|
||||
.map((MapFunction<Tuple2<String, R>, R>) t2 -> t2._2(), Encoders.bean(clazz))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
}
|
||||
|
||||
private static <R extends Result> void writeCommunityRelatedIds(SparkSession spark, String inputPath,
|
||||
Class<R> clazz, List<String> communityList, String outputPath, String resultType) {
|
||||
org.apache.spark.sql.Dataset<R> results = Utils
|
||||
.readPath(spark, inputPath + "/" + resultType, clazz)
|
||||
.filter(
|
||||
(FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() &&
|
||||
!p.getDataInfo().getInvisible() &&
|
||||
isRelatedToCommunities(p, communityList));
|
||||
results
|
||||
.map((MapFunction<R, String>) Result::getId, Encoders.STRING())
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.text(outputPath + COMMUNITY_RESULT_IDS);
|
||||
|
||||
// results
|
||||
// // .repartition(10000)
|
||||
// .write()
|
||||
// .option("compression", "gzip")
|
||||
// .mode(SaveMode.Append)
|
||||
// .json(outputPath + "/" + resultType);
|
||||
|
||||
}
|
||||
|
||||
private static <R extends Result> boolean isRelatedToCommunities(R p, List<String> communityList) {
|
||||
return p
|
||||
.getContext()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
c -> communityList.contains(c.getId()) ||
|
||||
(c.getId().contains("::")
|
||||
&& communityList.contains(c.getId().substring(0, c.getId().indexOf("::")))));
|
||||
}
|
||||
|
||||
}
|
|
@ -65,7 +65,7 @@
|
|||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
<start to="fork_dump_result_author_pid" />
|
||||
<start to="dump_communities" />
|
||||
|
||||
<action name="dump_communities">
|
||||
<java>
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveJdbcUrl</name>
|
||||
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<value>openaire</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,102 @@
|
|||
<workflow-app name="dump_graph_csv" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<description>the source path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>the output path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>communities</name>
|
||||
<description>the communities whose products should be dumped</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
<start to="select_result_dump_relation" />
|
||||
|
||||
<action name="select_result_dump_relation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>select results ids connected to communities and dump relation </name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.dump.serafeim.SparkSelectResultsAndDumpRelations</class>
|
||||
<jar>dump-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=10G
|
||||
--executor-cores=3
|
||||
--driver-memory=10G
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--communities</arg><arg>${communities}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -13,7 +13,10 @@ import java.util.Optional;
|
|||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.Document;
|
||||
|
@ -30,8 +33,12 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import scala.Function1;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
|
@ -96,7 +103,7 @@ public class DumpResultTest {
|
|||
|
||||
SparkDumpResults.main(new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-outputPath", workingDir.toString() + "/output",
|
||||
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-resultType", "publication",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
|
@ -105,61 +112,69 @@ public class DumpResultTest {
|
|||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
Dataset<Row> tmp = spark
|
||||
.read()
|
||||
.option("header", "true")
|
||||
.option("delimiter", Constants.SEP)
|
||||
.csv(workingDir.toString() + "/working/publication/result");
|
||||
Dataset<CSVResult> tmp = Utils
|
||||
.readPath(spark, workingDir.toString() + "/working/publication/result", CSVResult.class);
|
||||
|
||||
Assertions.assertEquals(3, tmp.count());
|
||||
Row row = tmp
|
||||
.where("id = '50|DansKnawCris::0224aae28af558f21768dbc6439c7a95'")
|
||||
tmp.show(false);
|
||||
|
||||
Assertions.assertEquals(4, tmp.count());
|
||||
CSVResult row = tmp
|
||||
.filter(
|
||||
(FilterFunction<CSVResult>) r -> r.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
|
||||
.first();
|
||||
Assertions.assertEquals(ModelConstants.OPEN_ACCESS_RIGHT().getClassid(), row.getAs("accessright"));
|
||||
Assertions.assertEquals("FI", row.getAs("country"));
|
||||
Assertions.assertEquals("Lit.opg., bijl.", row.getAs("description"));
|
||||
Assertions.assertEquals(3, split(row.getAs("keywords"), ", ").length);
|
||||
Assertions.assertTrue(row.getAs("keywords").toString().contains("archeologie"));
|
||||
Assertions.assertTrue(row.getAs("keywords").toString().contains("prospectie"));
|
||||
Assertions.assertTrue(row.getAs("keywords").toString().contains("archaeology"));
|
||||
Assertions.assertEquals("nl", row.getAs("language"));
|
||||
Assertions.assertEquals("2007-01-01", row.getAs("publication_date"));
|
||||
Assertions.assertEquals("FakePublisher1", row.getAs("publisher"));
|
||||
Assertions.assertEquals(ModelConstants.OPEN_ACCESS_RIGHT().getClassid(), row.getAccessright());
|
||||
Assertions.assertEquals("FI", row.getCountry());
|
||||
Assertions.assertEquals("Lit.opg., bijl.", row.getDescription());
|
||||
Assertions.assertEquals(3, split(row.getKeywords(), ", ").length);
|
||||
Assertions.assertTrue(row.getKeywords().toString().contains("archeologie"));
|
||||
Assertions.assertTrue(row.getKeywords().toString().contains("prospectie"));
|
||||
Assertions.assertTrue(row.getKeywords().toString().contains("archaeology"));
|
||||
Assertions.assertEquals("nl", row.getLanguage());
|
||||
Assertions.assertEquals("2007-01-01", row.getPublication_date());
|
||||
Assertions.assertEquals("FakePublisher1", row.getPublisher());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Inventariserend veldonderzoek d.m.v. boringen (karterende fase) : Raadhuisstraat te Dirkshorn, gemeente Harenkarspel",
|
||||
row.getAs("title"));
|
||||
Assertions.assertEquals("publication", row.getAs("type"));
|
||||
row.getTitle());
|
||||
Assertions.assertEquals("publication", row.getType());
|
||||
|
||||
row = tmp
|
||||
.where("id = '50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9'")
|
||||
.filter(
|
||||
(FilterFunction<CSVResult>) r -> r.getId().equals("50|doi_________::715fec7723208e6f17e855c204656e2f"))
|
||||
.first();
|
||||
Assertions.assertEquals(ModelConstants.OPEN_ACCESS_RIGHT().getClassid(), row.getAs("accessright"));
|
||||
Assertions.assertEquals(2, split(row.getAs("country"), ", ").length);
|
||||
Assertions.assertNull(row.getAs("description"));
|
||||
Assertions.assertEquals(2, split(row.getAs("keywords"), ", ").length);
|
||||
Assertions.assertTrue(row.getAs("keywords").toString().contains("archeologie"));
|
||||
Assertions.assertTrue(row.getAs("keywords").toString().contains("archaeology"));
|
||||
Assertions.assertEquals("UNKNOWN", row.getAs("language"));
|
||||
Assertions.assertNull(row.getAs("publication_date"));
|
||||
Assertions.assertNull(row.getAs("publisher"));
|
||||
Assertions.assertEquals("None", row.getAs("title"));
|
||||
Assertions.assertEquals("publication", row.getAs("type"));
|
||||
|
||||
row = tmp
|
||||
.where("id = '50|DansKnawCris::26780065282e607306372abd0d808245'")
|
||||
.first();
|
||||
Assertions.assertEquals(ModelConstants.OPEN_ACCESS_RIGHT().getClassid(), row.getAs("accessright"));
|
||||
Assertions.assertNull(row.getAs("country"));
|
||||
Assertions.assertNull(row.getAs("description"));
|
||||
Assertions.assertEquals(2, split(row.getAs("keywords"), ", ").length);
|
||||
Assertions.assertTrue(row.getAs("keywords").toString().contains("archeologie"));
|
||||
Assertions.assertTrue(row.getAs("keywords").toString().contains("archaeology"));
|
||||
Assertions.assertEquals("UNKNOWN", row.getAs("language"));
|
||||
Assertions.assertNull(row.getAs("publication_date"));
|
||||
Assertions.assertNull(row.getAs("publisher"));
|
||||
Assertions.assertEquals("None", row.getAs("title"));
|
||||
Assertions.assertEquals("publication", row.getAs("type"));
|
||||
System.out.println(row.getPublisher());
|
||||
String a = row.getPublisher().replace("\\n", " ");
|
||||
System.out.println(a);
|
||||
// row = tmp
|
||||
// .where("id = '50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9'")
|
||||
// .first();
|
||||
// Assertions.assertEquals(ModelConstants.OPEN_ACCESS_RIGHT().getClassid(), row.getAs("accessright"));
|
||||
// Assertions.assertEquals(2, split(row.getAs("country"), ", ").length);
|
||||
// Assertions.assertNull(row.getAs("description"));
|
||||
// Assertions.assertEquals(2, split(row.getAs("keywords"), ", ").length);
|
||||
// Assertions.assertTrue(row.getAs("keywords").toString().contains("archeologie"));
|
||||
// Assertions.assertTrue(row.getAs("keywords").toString().contains("archaeology"));
|
||||
// Assertions.assertEquals("UNKNOWN", row.getAs("language"));
|
||||
// Assertions.assertNull(row.getAs("publication_date"));
|
||||
// Assertions.assertNull(row.getAs("publisher"));
|
||||
// Assertions.assertEquals("None", row.getAs("title"));
|
||||
// Assertions.assertEquals("publication", row.getAs("type"));
|
||||
//
|
||||
// row = tmp
|
||||
// .where("id = '50|DansKnawCris::26780065282e607306372abd0d808245'")
|
||||
// .first();
|
||||
// Assertions.assertEquals(ModelConstants.OPEN_ACCESS_RIGHT().getClassid(), row.getAs("accessright"));
|
||||
// Assertions.assertNull(row.getAs("country"));
|
||||
// Assertions.assertNull(row.getAs("description"));
|
||||
// Assertions.assertEquals(2, split(row.getAs("keywords"), ", ").length);
|
||||
// Assertions.assertTrue(row.getAs("keywords").toString().contains("archeologie"));
|
||||
// Assertions.assertTrue(row.getAs("keywords").toString().contains("archaeology"));
|
||||
// Assertions.assertEquals("UNKNOWN", row.getAs("language"));
|
||||
// Assertions.assertNull(row.getAs("publication_date"));
|
||||
// Assertions.assertNull(row.getAs("publisher"));
|
||||
// Assertions.assertEquals("None", row.getAs("title"));
|
||||
// Assertions.assertEquals("publication", row.getAs("type"));
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -98,12 +98,14 @@ public class MoveOnSingleDirTest {
|
|||
.option("delimiter", Constants.SEP)
|
||||
.csv(workingDir.toString() + "/output/result");
|
||||
|
||||
Assertions.assertEquals(21, tmp.count());
|
||||
Assertions.assertEquals(22, tmp.count());
|
||||
Assertions.assertEquals(12, tmp.filter("type == 'dataset'").count());
|
||||
Assertions.assertEquals(4, tmp.filter("type == 'other'").count());
|
||||
Assertions.assertEquals(4, tmp.filter("type == 'publication'").count());
|
||||
Assertions.assertEquals(5, tmp.filter("type == 'publication'").count());
|
||||
Assertions.assertEquals(1, tmp.filter("type == 'software'").count());
|
||||
|
||||
tmp.filter("type == 'publication'").show(false);
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
8, spark
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -1,4 +1,5 @@
|
|||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"Cites","relType":"datasourceOrganization","source":"50|DansKnawCris::26780065282e607306372abd0d808245","subRelType":"provision","target":"50|DansKnawCris::26780065282e607306372abd0d808246"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"Cites","relType":"datasourceOrganization","source":"50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9","subRelType":"provision","target":"50|DansKnawCris::26780065282e607306372abd0d808245"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"Cites","relType":"datasourceOrganization","source":"50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9","subRelType":"provision","target":"50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"Cites","relType":"datasourceOrganization","source":"50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9","subRelType":"provision","target":"50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"Cites","relType":"datasourceOrganization","source":"50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9","subRelType":"provision","target":"50|doi_________::715fec7723208e6f17e855c204656e2f"}
|
|
@ -1,4 +1,5 @@
|
|||
{"accessright":"OPEN","country":"","description":"We describe the CoNLL-2002 shared task: language-independent named entity recognition. We give background information on the data sets and the evaluation method, present a general overview of the systems that have taken part in the task and discuss their performance.","id":"50|doi_dedup___::13b14c741a7b3420591c161f54ed5c80","keywords":"computer science - computation and language, i.2.7, computation and language (cs.cl), fos: computer and information sciences","language":"eng","publication_date":"2002-09-05","publisher":"","title":"Introduction to the CoNLL-2002 Shared Task: Language-Independent Named Entity Recognition","type":"publication"}
|
||||
{"accessright":"OPEN","country":"GB","description":"Following a strategy similar to that used in baker's yeast (Herrgård et al. Nat Biotechnol 26:1155-1160, 2008). A consensus yeast metabolic network obtained from a community approach to systems biology (Herrgård et al. 2008; Dobson et al. BMC Syst Biol 4:145, 2010). Further developments towards a genome-scale metabolic model of yeast (Dobson et al. 2010; Heavner et al. BMC Syst Biol 6:55, 2012). Yeast 5-an expanded reconstruction of the Saccharomyces cerevisiae metabolic network (Heavner et al. 2012) and in Salmonella typhimurium (Thiele et al. BMC Syst Biol 5:8, 2011). A community effort towards a knowledge-base and mathematical model of the human pathogen Salmonellatyphimurium LT2 (Thiele et al. 2011), a recent paper (Thiele et al. Nat Biotechnol 31:419-425, 2013). A community-driven global reconstruction of human metabolism (Thiele et al. 2013) described a much improved 'community consensus' reconstruction of the human metabolic network, called Recon 2, and the authors (that include the present ones) have made it freely available via a database at http://humanmetabolism.org/ and in SBML format at Biomodels (http://identifiers.org/biomodels.db/MODEL1109130000. This short analysis summarises the main findings, and suggests some approaches that will be able to exploit the availability of this model to advantage. © 2013 The Author(s).","id":"50|doi_dedup___::e0392f427fea9a701aa469e6f24bdf93","keywords":"review article, metabolism, modelling, systems biology, networks, metabolic networks, clinical biochemistry, biochemistry, endocrinology, diabetes and metabolism, community approach, operations research, metabolic network, human metabolism, metabolic model, biology, computational biology, sbml, 03 medical and health sciences, 0302 clinical medicine, 0303 health sciences, 030220 oncology & carcinogenesis, 030304 developmental biology, researchinstitutes_networks_beacons/manchester_institute_of_biotechnology, manchester institute of biotechnology","language":"eng","publication_date":"2013-08-01","publisher":"Springer US","title":"An analysis of a ‘community-driven’ reconstruction of the human metabolic network","type":"publication"}
|
||||
{"accessright":"OPEN","country":"","description":"Current machine learning systems operate, almost exclusively, in a statistical, or model-free mode, which entails severe theoretical limits on their power and performance. Such systems cannot reason about interventions and retrospection and, therefore, cannot serve as the basis for strong AI. To achieve human level intelligence, learning machines need the guidance of a model of reality, similar to the ones used in causal inference tasks. To demonstrate the essential role of such models, I will present a summary of seven tasks which are beyond reach of current machine learning systems and which have been accomplished using the tools of causal modeling.","id":"50|doi_dedup___::2436e90941a664931b54b956ade5b77b","keywords":"machine learning (cs.lg), artificial intelligence (cs.ai), machine learning (stat.ml), fos: computer and information sciences, mode (statistics), causal inference, artificial intelligence, business.industry, business, power (physics), computer science, machine learning, computer.software_genre, computer, basis (linear algebra), 03 medical and health sciences, 02 engineering and technology, 0202 electrical engineering, electronic engineering, information engineering, 0301 basic medicine, 020201 artificial intelligence & image processing, 030104 developmental biology, computer science - learning, computer science - artificial intelligence, statistics - machine learning","language":"und","publication_date":"2018-02-02","publisher":"arXiv","title":"Theoretical Impediments to Machine Learning With Seven Sparks from the Causal Revolution","type":"publication"}
|
||||
{"accessright":"OPEN","country":"","description":"In most natural and engineered systems, a set of entities interact with each other in complicated patterns that can encompass multiple types of relationships, change in time, and include other types of complications. Such systems include multiple subsystems and layers of connectivity, and it is important to take such \"multilayer\" features into account to try to improve our understanding of complex systems. Consequently, it is necessary to generalize \"traditional\" network theory by developing (and validating) a framework and associated tools to study multilayer systems in a comprehensive fashion. The origins of such efforts date back several decades and arose in multiple disciplines, and now the study of multilayer networks has become one of the most important directions in network science. In this paper, we discuss the history of multilayer networks (and related concepts) and review the exploding body of work on such networks. To unify the disparate terminology in the large body of recent work, we discuss a general framework for multilayer networks, construct a dictionary of terminology to relate the numerous existing concepts to each other, and provide a thorough discussion that compares, contrasts, and translates between related notions such as multilayer networks, multiplex networks, interdependent networks, networks of networks, and many others. We also survey and discuss existing data sets that can be represented as multilayer networks. We review attempts to generalize single-layer-network diagnostics to multilayer networks. We also discuss the rapidly expanding research on multilayer-network models and notions like community structure, connected components, tensor decompositions, and various types of dynamical processes on multilayer networks. We conclude with a summary and an outlook.","id":"50|doi_dedup___::c5a574592f2e347f27be49d2c20a5558","keywords":"applied mathematics, computational mathematics, control and optimization, management science and operations research, computer networks and communications, data science, connected component, terminology, complex system, network theory, network science, construct (philosophy), computer science, interdependent networks, set (psychology), 01 natural sciences, 0103 physical sciences, 010306 general physics, 010305 fluids & plasmas, physics - physics and society, computer science - social and information networks, physics and society (physics.soc-ph), social and information networks (cs.si), fos: physical sciences, fos: computer and information sciences","language":"und","publication_date":"2013-09-27","publisher":"Oxford University Press (OUP)","title":"Multilayer networks","type":"publication"}
|
||||
{"accessright":"OPEN","country":"","description":"In most natural and engineered systems, a set of entities interact with each other in complicated patterns that can encompass multiple types of relationships, change in time, and include other types of complications. Such systems include multiple subsystems and layers of connectivity, and it is important to take such \"multilayer\" features into account to try to improve our understanding of complex systems. Consequently, it is necessary to generalize \"traditional\" network theory by developing (and validating) a framework and associated tools to study multilayer systems in a comprehensive fashion. The origins of such efforts date back several decades and arose in multiple disciplines, and now the study of multilayer networks has become one of the most important directions in network science. In this paper, we discuss the history of multilayer networks (and related concepts) and review the exploding body of work on such networks. To unify the disparate terminology in the large body of recent work, we discuss a general framework for multilayer networks, construct a dictionary of terminology to relate the numerous existing concepts to each other, and provide a thorough discussion that compares, contrasts, and translates between related notions such as multilayer networks, multiplex networks, interdependent networks, networks of networks, and many others. We also survey and discuss existing data sets that can be represented as multilayer networks. We review attempts to generalize single-layer-network diagnostics to multilayer networks. We also discuss the rapidly expanding research on multilayer-network models and notions like community structure, connected components, tensor decompositions, and various types of dynamical processes on multilayer networks. We conclude with a summary and an outlook.","id":"50|doi_dedup___::c5a574592f2e347f27be49d2c20a5558","keywords":"applied mathematics, computational mathematics, control and optimization, management science and operations research, computer networks and communications, data science, connected component, terminology, complex system, network theory, network science, construct (philosophy), computer science, interdependent networks, set (psychology), 01 natural sciences, 0103 physical sciences, 010306 general physics, 010305 fluids & plasmas, physics - physics and society, computer science - social and information networks, physics and society (physics.soc-ph), social and information networks (cs.si), fos: physical sciences, fos: computer and information sciences","language":"und","publication_date":"2013-09-27","publisher":"Oxford University Press (OUP)","title":"Multilayer networks","type":"publication"}
|
||||
{"accessright":"UNKNOWN","country":"","description":"","id":"50|doi_________::715fec7723208e6f17e855c204656e2f","keywords":"","language":"und","publication_date":"1998-10-19","publisher":"American Mathematical\\n Society","title":"Good encodings for DNA-based solutions to combinatorial problems","type":"publication"}
|
|
@ -0,0 +1 @@
|
|||
50|doi_________::715fec7723208e6f17e855c204656e2f
|
2
pom.xml
2
pom.xml
|
@ -102,7 +102,7 @@
|
|||
<junit-jupiter.version>5.6.1</junit-jupiter.version>
|
||||
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
||||
<dhp.guava.version>11.0.2</dhp.guava.version>
|
||||
<dhp-schemas.version>[2.12.1]</dhp-schemas.version>
|
||||
<dhp-schemas.version>[2.13.1-patched]</dhp-schemas.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
Loading…
Reference in New Issue