[Transformative Agreement] add results with information abount the agreement and the country of the organization paid for it

This commit is contained in:
Miriam Baglioni 2024-02-13 12:21:07 +01:00
parent bdb6bbb365
commit eca021f4d6
10 changed files with 96903 additions and 26 deletions

View File

@ -13,7 +13,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
@ -24,12 +23,13 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.*;
import scala.Tuple2;
@ -72,11 +72,11 @@ public class CreateActionSetSparkJob implements Serializable {
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> getRelations(spark, inputPath, outputPath));
spark -> createActionSet(spark, inputPath, outputPath));
}
private static void getRelations(SparkSession spark, String inputPath, String outputPath) {
private static void createActionSet(SparkSession spark, String inputPath, String outputPath) {
spark
.read()
.textFile(inputPath)
@ -92,10 +92,55 @@ public class CreateActionSetSparkJob implements Serializable {
.filter((FilterFunction<Relation>) Objects::nonNull)
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.union(
spark
.read()
.textFile(inputPath)
.map(
(MapFunction<String, TransformativeAgreementModel>) value -> OBJECT_MAPPER
.readValue(value, TransformativeAgreementModel.class),
Encoders.bean(TransformativeAgreementModel.class))
.map(
(MapFunction<TransformativeAgreementModel, Result>) value -> createResult(
value),
Encoders.bean(Result.class))
.filter((FilterFunction<Result>) r -> r != null)
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
private static Result createResult(TransformativeAgreementModel value) {
Result r = new Result();
r
.setId(
"50|doi_________::"
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getDoi())));
r.setTransformativeAgreement(value.getAgreement());
Country country = new Country();
country.setClassid(value.getCountry());
country.setClassname(value.getCountry());
country
.setDataInfo(
OafMapperUtils
.dataInfo(
false, ModelConstants.SYSIMPORT_ACTIONSET, false, false,
OafMapperUtils
.qualifier(
"openapc::transformativeagreement",
"Harvested from Trnasformative Agreement file from OpenAPC",
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.9"));
country.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE);
country.setSchemename(ModelConstants.DNET_COUNTRY_TYPE);
r.setCountry(Arrays.asList(country));
return r;
}
private static List<Relation> createRelation(TransformativeAgreementModel value) {

View File

@ -15,6 +15,15 @@ public class TransformativeAgreementModel implements Serializable {
private String institution;
private String doi;
private String agreement;
private String country;
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getInstitution() {
return institution;

View File

@ -16,11 +16,5 @@
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "sdr",
"paramLongName": "shouldDuplicateRels",
"paramDescription": "activates/deactivates the construction of bidirectional relations Cites/IsCitedBy",
"paramRequired": false
}
]

View File

@ -0,0 +1,324 @@
package eu.dnetlib.dhp.actionmanager.transformativeagreement;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob;
import eu.dnetlib.dhp.actionmanager.opencitations.CreateOpenCitationsASTest;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
/**
* @author miriam.baglioni
* @Date 13/02/24
*/
public class CreateTAActionSetTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(CreateOpenCitationsASTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(CreateTAActionSetTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(CreateTAActionSetTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(CreateTAActionSetTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void createActionSet() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/transformativeagreement/facts.json")
.getPath();
eu.dnetlib.dhp.actionmanager.transformativeagreement.CreateActionSetSparkJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet1"
});
}
@Test
void testNumberofRelations2() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath();
eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet2"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet2", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
assertEquals(23, tmp.count());
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
}
@Test
void testRelationsCollectedFrom() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath();
eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet3"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet3", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
tmp.foreach(r -> {
assertEquals(ModelConstants.OPENOCITATIONS_NAME, r.getCollectedfrom().get(0).getValue());
assertEquals(ModelConstants.OPENOCITATIONS_ID, r.getCollectedfrom().get(0).getKey());
});
}
@Test
void testRelationsDataInfo() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath();
eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet4"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet4", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
tmp.foreach(r -> {
assertEquals(false, r.getDataInfo().getInferred());
assertEquals(false, r.getDataInfo().getDeletedbyinference());
assertEquals("0.91", r.getDataInfo().getTrust());
assertEquals(
eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob.OPENCITATIONS_CLASSID,
r.getDataInfo().getProvenanceaction().getClassid());
assertEquals(
eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob.OPENCITATIONS_CLASSNAME,
r.getDataInfo().getProvenanceaction().getClassname());
assertEquals(ModelConstants.DNET_PROVENANCE_ACTIONS, r.getDataInfo().getProvenanceaction().getSchemeid());
assertEquals(ModelConstants.DNET_PROVENANCE_ACTIONS, r.getDataInfo().getProvenanceaction().getSchemename());
});
}
@Test
void testRelationsSemantics() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath();
eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet5"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet5", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
tmp.foreach(r -> {
assertEquals("citation", r.getSubRelType());
assertEquals("resultResult", r.getRelType());
});
assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count());
assertEquals(0, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count());
}
@Test
void testRelationsSourceTargetPrefix() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath();
eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet6"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet6", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
tmp.foreach(r -> {
assertEquals("50|doi_________::", r.getSource().substring(0, 17));
assertEquals("50|doi_________::", r.getTarget().substring(0, 17));
});
}
@Test
void testRelationsSourceTargetCouple() throws Exception {
final String doi1 = "50|doi_________::"
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-015-3684-x"));
final String doi2 = "50|doi_________::"
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1111/j.1551-2916.2008.02408.x"));
final String doi3 = "50|doi_________::"
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-014-2114-9"));
final String doi4 = "50|doi_________::"
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/j.ceramint.2013.09.069"));
final String doi5 = "50|doi_________::"
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-009-9913-4"));
final String doi6 = "50|doi_________::"
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/0038-1098(72)90370-5"));
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
.getPath();
CreateActionSetSparkJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet7"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet7", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
JavaRDD<Relation> check = tmp.filter(r -> r.getSource().equals(doi1) || r.getTarget().equals(doi1));
assertEquals(5, check.count());
// check.foreach(r -> {
// if (r.getSource().equals(doi2) || r.getSource().equals(doi3) || r.getSource().equals(doi4) ||
// r.getSource().equals(doi5) || r.getSource().equals(doi6)) {
// assertEquals(ModelConstants.IS_CITED_BY, r.getRelClass());
// assertEquals(doi1, r.getTarget());
// }
// });
assertEquals(5, check.filter(r -> r.getSource().equals(doi1)).count());
check.filter(r -> r.getSource().equals(doi1)).foreach(r -> assertEquals(ModelConstants.CITES, r.getRelClass()));
}
}

View File

@ -24,9 +24,14 @@ class CrossrefMappingTest {
@Test
def testMissingAuthorParser(): Unit = {
val json: String = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/s41567-022-01757-y.json")).mkString
val json: String = Source
.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/s41567-022-01757-y.json"))
.mkString
val result = Crossref2Oaf.convert(json)
result.filter(o => o.isInstanceOf[Publication]).map(p=> p.asInstanceOf[Publication]).foreach(p =>assertTrue(p.getAuthor.size()>0))
result
.filter(o => o.isInstanceOf[Publication])
.map(p => p.asInstanceOf[Publication])
.foreach(p => assertTrue(p.getAuthor.size() > 0))
}
@Test

View File

@ -1293,6 +1293,7 @@ class MappersTest {
p.getInstance().get(0).getAlternateIdentifier().forEach(x -> System.out.println(x.getValue()));
}
@Test
void testNotWellFormed() throws IOException {
final String xml = IOUtils

View File

@ -888,7 +888,7 @@
<mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[4.17.2]</dhp-schemas.version>
<dhp-schemas.version>[5.17.3]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>