openCitations #194

Merged
miriam.baglioni merged 6 commits from openCitations into beta 2022-02-14 14:58:28 +01:00
18 changed files with 394 additions and 26 deletions
Showing only changes of commit fbc28ee8c3 - Show all commits

View File

@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -83,10 +85,16 @@ public class CreateActionSetSparkJob implements Serializable {
private static void extractContent(SparkSession spark, String inputPath, String outputPath, private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) { boolean shouldDuplicateRels) {
spark spark
.sqlContext() .read()
.createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) .textFile(inputPath + "/*")
.map(
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
Encoders.bean(COCI.class))
// spark
// .sqlContext()
// .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING())
.flatMap( .flatMap(
(FlatMapFunction<String, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(), (FlatMapFunction<COCI, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) value -> value != null) .filter((FilterFunction<Relation>) value -> value != null)
.toJavaRDD() .toJavaRDD()
@ -98,15 +106,14 @@ public class CreateActionSetSparkJob implements Serializable {
} }
private static List<Relation> createRelation(String value, boolean duplicate) { private static List<Relation> createRelation(COCI value, boolean duplicate) {
String[] line = value.split(",");
if (!line[1].startsWith("10.")) {
return new ArrayList<>();
}
List<Relation> relationList = new ArrayList<>(); List<Relation> relationList = new ArrayList<>();
String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); String citing = ID_PREFIX
final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
final String cited = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
relationList relationList
.addAll( .addAll(
@ -114,9 +121,11 @@ public class CreateActionSetSparkJob implements Serializable {
citing, citing,
cited)); cited));
if (duplicate && line[1].endsWith(".refs")) { if (duplicate && value.getCiting().endsWith(".refs")) {
citing = ID_PREFIX + IdentifierFactory citing = ID_PREFIX + IdentifierFactory
.md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); .md5(
CleaningFunctions
.normalizePidValue("doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs"))));
relationList.addAll(getRelations(citing, cited)); relationList.addAll(getRelations(citing, cited));
} }

View File

@ -0,0 +1,111 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class ReadCOCI implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ReadCOCI.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
ReadCOCI.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String hdfsNameNode = parser.get("nameNode");
log.info("nameNode: {}", hdfsNameNode);
final String inputPath = parser.get("sourcePath");
log.info("input path : {}", inputPath);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
SparkConf sconf = new SparkConf();
final String delimiter = Optional
.ofNullable(parser.get("delimiter"))
.orElse(DEFAULT_DELIMITER);
runWithSparkSession(
sconf,
isSparkSessionManaged,
spark -> {
doRead(
spark,
fileSystem,
inputPath,
outputPath,
delimiter);
});
}
public static void doRead(SparkSession spark, FileSystem fileSystem, String inputPath, String outputPath,
String delimiter) throws IOException {
RemoteIterator<LocatedFileStatus> iterator = fileSystem
.listFiles(
new Path(inputPath), true);
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
Path p = fileStatus.getPath();
String p_string = p.toString();
Dataset<Row> cociData = spark
.read()
.format("csv")
.option("sep", delimiter)
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(p_string);
cociData.map((MapFunction<Row, COCI>) row -> {
COCI coci = new COCI();
coci.setOci(row.getString(0));
coci.setCiting(row.getString(1));
coci.setCited(row.getString(2));
return coci;
}, Encoders.bean(COCI.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + p_string.substring(p_string.lastIndexOf("/") + 1));
}
}
}

View File

@ -0,0 +1,89 @@
package eu.dnetlib.dhp.actionmanager.opencitations.model;
import java.io.Serializable;
import com.opencsv.bean.CsvBindByPosition;
public class COCI implements Serializable {
@CsvBindByPosition(position = 0)
// @CsvBindByName(column = "doi")
Review

This commented line is confusing. The first column of the csv is the coci id, not the doi.
The same applies to the commented lines below

This commented line is confusing. The first column of the csv is the coci id, not the doi. The same applies to the commented lines below
Review

done

done
private String oci;
@CsvBindByPosition(position = 1)
// @CsvBindByName(column = "level1")
private String citing;
@CsvBindByPosition(position = 2)
// @CsvBindByName(column = "level2")
private String cited;
@CsvBindByPosition(position = 3)
// @CsvBindByName(column = "level3")
private String creation;
@CsvBindByPosition(position = 4)
private String timespan;
@CsvBindByPosition(position = 5)
private String journal_sc;
@CsvBindByPosition(position = 6)
private String author_sc;
public String getOci() {
return oci;
}
public void setOci(String oci) {
this.oci = oci;
}
public String getCiting() {
return citing;
}
public void setCiting(String citing) {
this.citing = citing;
}
public String getCited() {
return cited;
}
public void setCited(String cited) {
this.cited = cited;
}
public String getCreation() {
return creation;
}
public void setCreation(String creation) {
this.creation = creation;
}
public String getTimespan() {
return timespan;
}
public void setTimespan(String timespan) {
this.timespan = timespan;
}
public String getJournal_sc() {
return journal_sc;
}
public void setJournal_sc(String journal_sc) {
this.journal_sc = journal_sc;
}
public String getAuthor_sc() {
return author_sc;
}
public void setAuthor_sc(String author_sc) {
this.author_sc = author_sc;
}
}

View File

@ -0,0 +1,36 @@
[
{
"paramName": "sp",
"paramLongName": "sourcePath",
"paramDescription": "the zipped opencitations file",
"paramRequired": true
},
{
"paramName": "nn",
"paramLongName": "nameNode",
"paramDescription": "the hdfs name node",
"paramRequired": true
},
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "d",
"paramLongName": "delimiter",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the hdfs name node",
"paramRequired": false
}
]

View File

@ -26,6 +26,7 @@
<switch> <switch>
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case> <case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
<case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case> <case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case>
<case to="read">${wf:conf('resumeFrom') eq 'ReadContent'}</case>
<default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed --> <default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed -->
</switch> </switch>
</decision> </decision>
@ -60,6 +61,32 @@
<arg>--inputFile</arg><arg>${inputFile}</arg> <arg>--inputFile</arg><arg>${inputFile}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
</java> </java>
<ok to="read"/>
<error to="Kill"/>
</action>
<action name="read">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the AS for OC</name>
<class>eu.dnetlib.dhp.actionmanager.opencitations.ReadCOCI</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}/COCI</arg>
<arg>--outputPath</arg><arg>${workingDir}/COCI</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg>
</spark>
<ok to="create_actionset"/> <ok to="create_actionset"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -99,7 +99,7 @@ public class CreateOpenCitationsASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
assertEquals(60, tmp.count()); assertEquals(62, tmp.count());
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
@ -110,7 +110,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -131,7 +131,7 @@ public class CreateOpenCitationsASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
assertEquals(44, tmp.count()); assertEquals(46, tmp.count());
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
@ -142,7 +142,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -175,7 +175,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -215,7 +215,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -240,8 +240,8 @@ public class CreateOpenCitationsASTest {
assertEquals("citation", r.getSubRelType()); assertEquals("citation", r.getSubRelType());
assertEquals("resultResult", r.getRelType()); assertEquals("resultResult", r.getRelType());
}); });
assertEquals(22, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count());
assertEquals(22, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); assertEquals(23, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count());
} }
@ -250,7 +250,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -295,7 +295,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob

View File

@ -0,0 +1,94 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.schema.oaf.Dataset;
public class ReadCOCITest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(ReadCOCITest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(ReadCOCITest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(ReadCOCITest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(ReadCOCITest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testReadCOCI() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
.getPath();
ReadCOCI
.doRead(
spark, FileSystem.getLocal(new Configuration()), inputPath,
workingDir.toString() + "/COCI", DEFAULT_DELIMITER);
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<COCI> tmp = sc
.textFile(workingDir.toString() + "/COCI/*/")
.map(item -> OBJECT_MAPPER.readValue(item, COCI.class));
Assertions.assertEquals(23, tmp.count());
Assertions.assertEquals(1, tmp.filter(c -> c.getCiting().equals("10.1207/s15327647jcd3,4-01")).count());
Assertions.assertEquals(8, tmp.filter(c -> c.getCiting().indexOf(".refs") > -1).count());
}
}

View File

@ -0,0 +1,2 @@
oci,citing,cited,creation,timespan,journal_sc,author_sc
0200102000736280105030207060407191213036204630001-02001000107362800030005000000090000000006060903,"10.1207/s15327647jcd3,4-01",10.1017/s0305000900006693,2002-11-01,P17Y1M,no,no

View File

@ -44,7 +44,7 @@
<pluginRepository> <pluginRepository>
<id>iis-releases</id> <id>iis-releases</id>
<name>iis releases plugin repository</name> <name>iis releases plugin repository</name>
<url>http://maven.ceon.pl/artifactory/iis-releases</url> <url>https://maven.ceon.pl/artifactory/iis-releases</url>
<layout>default</layout> <layout>default</layout>
</pluginRepository> </pluginRepository>
</pluginRepositories> </pluginRepositories>