Merge pull request 'openCitations' (#194) from openCitations into beta

Reviewed-on: D-Net/dnet-hadoop#194
This commit is contained in:
Miriam Baglioni 2022-02-14 14:58:28 +01:00
commit a1013e62d4
21 changed files with 392 additions and 32 deletions

View File

@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
@ -83,10 +85,13 @@ public class CreateActionSetSparkJob implements Serializable {
private static void extractContent(SparkSession spark, String inputPath, String outputPath, private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) { boolean shouldDuplicateRels) {
spark spark
.sqlContext() .read()
.createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) .textFile(inputPath + "/*")
.map(
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
Encoders.bean(COCI.class))
.flatMap( .flatMap(
(FlatMapFunction<String, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(), (FlatMapFunction<COCI, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) value -> value != null) .filter((FilterFunction<Relation>) value -> value != null)
.toJavaRDD() .toJavaRDD()
@ -98,26 +103,29 @@ public class CreateActionSetSparkJob implements Serializable {
} }
private static List<Relation> createRelation(String value, boolean duplicate) { private static List<Relation> createRelation(COCI value, boolean duplicate) {
String[] line = value.split(",");
if (!line[1].startsWith("10.")) {
return new ArrayList<>();
}
List<Relation> relationList = new ArrayList<>(); List<Relation> relationList = new ArrayList<>();
String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); String citing = ID_PREFIX
final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
final String cited = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
relationList if(!citing.equals(cited)){
.addAll( relationList
getRelations( .addAll(
citing, getRelations(
cited)); citing,
cited));
if (duplicate && line[1].endsWith(".refs")) { if (duplicate && value.getCiting().endsWith(".refs")) {
citing = ID_PREFIX + IdentifierFactory citing = ID_PREFIX + IdentifierFactory
.md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); .md5(
relationList.addAll(getRelations(citing, cited)); CleaningFunctions
.normalizePidValue("doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs"))));
relationList.addAll(getRelations(citing, cited));
}
} }
return relationList; return relationList;

View File

@ -0,0 +1,103 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class ReadCOCI implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ReadCOCI.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
ReadCOCI.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString());
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
SparkConf sconf = new SparkConf();
final String delimiter = Optional
.ofNullable(parser.get("delimiter"))
.orElse(DEFAULT_DELIMITER);
runWithSparkSession(
sconf,
isSparkSessionManaged,
spark -> {
doRead(
spark,
workingPath,
inputFile,
outputPath,
delimiter);
});
}
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
String outputPath,
String delimiter) throws IOException {
for(String inputFile : inputFiles){
String p_string = workingPath + "/" + inputFile ;
Dataset<Row> cociData = spark
.read()
.format("csv")
.option("sep", delimiter)
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(p_string)
.repartition(100);
cociData.map((MapFunction<Row, COCI>) row -> {
COCI coci = new COCI();
coci.setOci(row.getString(0));
coci.setCiting(row.getString(1));
coci.setCited(row.getString(2));
return coci;
}, Encoders.bean(COCI.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + inputFile);
}
}
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.actionmanager.opencitations.model;
import java.io.Serializable;
import com.opencsv.bean.CsvBindByPosition;
public class COCI implements Serializable {
private String oci;
private String citing;
private String cited;
public String getOci() {
return oci;
}
public void setOci(String oci) {
this.oci = oci;
}
public String getCiting() {
return citing;
}
public void setCiting(String citing) {
this.citing = citing;
}
public String getCited() {
return cited;
}
public void setCited(String cited) {
this.cited = cited;
}
}

View File

@ -0,0 +1,37 @@
[
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the zipped opencitations file",
"paramRequired": true
},
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "d",
"paramLongName": "delimiter",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the hdfs name node",
"paramRequired": true
},
{
"paramName": "if",
"paramLongName": "inputFile",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
]

View File

@ -26,6 +26,7 @@
<switch> <switch>
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case> <case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
<case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case> <case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case>
<case to="read">${wf:conf('resumeFrom') eq 'ReadContent'}</case>
<default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed --> <default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed -->
</switch> </switch>
</decision> </decision>
@ -60,6 +61,32 @@
<arg>--inputFile</arg><arg>${inputFile}</arg> <arg>--inputFile</arg><arg>${inputFile}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
</java> </java>
<ok to="read"/>
<error to="Kill"/>
</action>
<action name="read">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the AS for OC</name>
<class>eu.dnetlib.dhp.actionmanager.opencitations.ReadCOCI</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}/COCI</arg>
<arg>--outputPath</arg><arg>${workingPath}/COCI_JSON</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg>
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
</spark>
<ok to="create_actionset"/> <ok to="create_actionset"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -81,7 +108,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/COCI</arg> <arg>--inputPath</arg><arg>${workingPath}/COCI_JSON</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>

View File

@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -99,7 +99,7 @@ public class CreateOpenCitationsASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
assertEquals(60, tmp.count()); assertEquals(62, tmp.count());
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
@ -110,7 +110,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -131,7 +131,7 @@ public class CreateOpenCitationsASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
assertEquals(44, tmp.count()); assertEquals(46, tmp.count());
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
@ -142,7 +142,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -175,7 +175,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -215,7 +215,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -240,8 +240,8 @@ public class CreateOpenCitationsASTest {
assertEquals("citation", r.getSubRelType()); assertEquals("citation", r.getSubRelType());
assertEquals("resultResult", r.getRelType()); assertEquals("resultResult", r.getRelType());
}); });
assertEquals(22, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count());
assertEquals(22, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); assertEquals(23, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count());
} }
@ -250,7 +250,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob
@ -295,7 +295,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") "/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath(); .getPath();
CreateActionSetSparkJob CreateActionSetSparkJob

View File

@ -0,0 +1,140 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.schema.oaf.Dataset;
public class ReadCOCITest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(ReadCOCITest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(ReadCOCITest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(ReadCOCITest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(ReadCOCITest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testReadCOCI() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles")
.getPath();
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input1"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input2"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input3"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input4")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input4"));
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input5")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir + "/COCI/input5"));
ReadCOCI
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-workingPath",
workingDir.toString() + "/COCI",
"-outputPath",
workingDir.toString() + "/COCI_json/",
"-inputFile", "input1;input2;input3;input4;input5"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<COCI> tmp = sc
.textFile(workingDir.toString() + "/COCI_json/*/")
.map(item -> OBJECT_MAPPER.readValue(item, COCI.class));
Assertions.assertEquals(24, tmp.count());
Assertions.assertEquals(1, tmp.filter(c -> c.getCiting().equals("10.1207/s15327647jcd3,4-01")).count());
Assertions.assertEquals(8, tmp.filter(c -> c.getCiting().indexOf(".refs") > -1).count());
}
}

View File

@ -0,0 +1,2 @@
oci,citing,cited,creation,timespan,journal_sc,author_sc
0200102000736280105030207060407191213036204630001-02001000107362800030005000000090000000006060903,"10.1207/s15327647jcd3,4-01",10.1017/s0305000900006693,2002-11-01,P17Y1M,no,no

View File

@ -0,0 +1,2 @@
oci,citing,cited,creation,timespan,journal_sc,author_sc
02001000007362801000805046300010563030608046333-02001000007362801000805046300010563030608046333,10.1007/s10854-015-3684-x,10.1007/s10854-015-3684-x,2015-09-01,P7Y2M,no,no

View File

@ -44,7 +44,7 @@
<pluginRepository> <pluginRepository>
<id>iis-releases</id> <id>iis-releases</id>
<name>iis releases plugin repository</name> <name>iis releases plugin repository</name>
<url>http://maven.ceon.pl/artifactory/iis-releases</url> <url>https://maven.ceon.pl/artifactory/iis-releases</url>
<layout>default</layout> <layout>default</layout>
</pluginRepository> </pluginRepository>
</pluginRepositories> </pluginRepositories>