[EOSC BulkTag] filtering aout the datasources registered in the eosc with compatibility different from 3.0, 4.0 for literature, data and CRIS to add the context eosc to the results

This commit is contained in:
Miriam Baglioni 2022-09-20 10:30:41 +02:00
parent bdc8f993d0
commit 840465958b
7 changed files with 195 additions and 68 deletions

View File

@ -16,6 +16,7 @@ import javax.print.attribute.DocAttributeSet;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -34,6 +35,7 @@ import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/** /**
* @author miriam.baglioni * @author miriam.baglioni
@ -44,6 +46,11 @@ public class SparkEoscBulkTag implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class); private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static String OPENAIRE_3 = "openaire3.0";
private static String OPENAIRE_4 = "openaire-pub_4.0";
private static String OPENAIRE_CRIS = "openaire-cris_1.1";
private static String OPENAIRE_DATA = "openaire2.0_data";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
@ -72,6 +79,9 @@ public class SparkEoscBulkTag implements Serializable {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -82,41 +92,67 @@ public class SparkEoscBulkTag implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, workingPath); removeOutputDir(spark, workingPath);
execBulkTag(spark, inputPath, workingPath, datasourceMapPath, resultClazz); selectCompliantDatasources(spark, inputPath, workingPath, datasourceMapPath);
execBulkTag(spark, inputPath, workingPath, resultType, resultClazz);
}); });
} }
private static void selectCompliantDatasources(SparkSession spark, String inputPath, String workingPath, String datasourceMapPath) {
Dataset<Datasource> datasources = readPath(spark, inputPath + "datasource", Datasource.class)
.filter((FilterFunction<Datasource>) ds -> {
final String compatibility = ds.getOpenairecompatibility().getClassid();
return compatibility.equalsIgnoreCase(OPENAIRE_3) ||
compatibility.equalsIgnoreCase(OPENAIRE_4) ||
compatibility.equalsIgnoreCase(OPENAIRE_CRIS) ||
compatibility.equalsIgnoreCase(OPENAIRE_DATA);
});
Dataset<DatasourceMaster> datasourceMaster = readPath(spark, datasourceMapPath, DatasourceMaster.class);
datasources.joinWith(datasourceMaster, datasources.col("id").equalTo(datasourceMaster.col("master")), "left")
.map((MapFunction<Tuple2<Datasource, DatasourceMaster>, DatasourceMaster>) t2 -> t2._2(), Encoders.bean(DatasourceMaster.class) )
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "datasource");
}
private static <R extends Result> void execBulkTag( private static <R extends Result> void execBulkTag(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
String workingPath, String workingPath,
String datasourceMapPath, String resultType,
Class<R> resultClazz) { Class<R> resultClazz) {
List<String> hostedByList = readPath(spark, datasourceMapPath, DatasourceMaster.class) List<String> hostedByList = readPath(spark, workingPath + "datasource", DatasourceMaster.class)
.map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING()) .map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING())
.collectAsList(); .collectAsList();
readPath(spark, inputPath, resultClazz) readPath(spark, inputPath + resultType, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz)) .map(
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> enrich(value, hostedByList), (MapFunction<R, R>) value -> enrich(value, hostedByList),
Encoders.bean(resultClazz)) Encoders.bean(resultClazz))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingPath); .json(workingPath + resultType);
readPath(spark, workingPath, resultClazz) readPath(spark, workingPath + resultType, resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(inputPath); .json(inputPath + resultType);
} }
private static <R extends Result> R enrich(R value, List<String> hostedByList) { private static <R extends Result> R enrich(R value, List<String> hostedByList) {
if (value.getDataInfo().getDeletedbyinference() == null) {
value.getDataInfo().setDeletedbyinference(false);
}
if (value.getContext() == null) {
value.setContext(new ArrayList<>());
}
if (value if (value
.getInstance() .getInstance()
.stream() .stream()

View File

@ -29,6 +29,13 @@
"paramLongName": "isSparkSessionManaged", "paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise", "paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false "paramRequired": false
},
{
"paramName": "rt",
"paramLongName": "resultType",
"paramDescription": "the result type",
"paramRequired": true
} }
] ]

View File

@ -282,8 +282,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/publication</arg> <arg>--sourcePath</arg><arg>${outputPath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/publication</arg> <arg>--resultType</arg><arg>publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -308,8 +309,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/dataset</arg> <arg>--sourcePath</arg><arg>${outputPath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/dataset</arg> <arg>--resultType</arg><arg>dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -333,8 +335,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/software</arg> <arg>--sourcePath</arg><arg>${outputPath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/software</arg> <arg>--resultType</arg><arg>software</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -358,8 +361,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${outputPath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/otherresearchproduct</arg> <arg>--resultType</arg><arg>otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>

View File

@ -6,6 +6,8 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List; import java.util.List;
import eu.dnetlib.dhp.bulktag.eosc.DatasourceMaster;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@ -28,14 +30,10 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag; import eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
//"50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea" has instance hostedby eosc //"50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea" has instance hostedby eosc (cris)
//"50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1" has instance hostedby eosc //"50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1" has instance hostedby eosc (zenodo)
//"50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7" has two instance one hostedby eosc //"50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7" has two instance one hostedby eosc (wrong compatibility)
//"50|475c1990cbb2::3894c94123e96df8a21249957cf160cb" has EoscTag //"50|475c1990cbb2::3894c94123e96df8a21249957cf160cb" has EoscTag
public class EOSCContextTaggingTest { public class EOSCContextTaggingTest {
@ -76,7 +74,107 @@ public class EOSCContextTaggingTest {
} }
@Test @Test
void EoscContextTagTest() throws Exception { void EoscContextTagTest() throws Exception{
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/datasource/datasource_1").getPath())
.map(
(MapFunction<String, Datasource>) value -> OBJECT_MAPPER.readValue(value, Datasource.class),
Encoders.bean(Datasource.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/datasource");
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json").getPath())
.map(
(MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class),
Encoders.bean(Dataset.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/dataset");
SparkEoscBulkTag
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
workingDir.toString() + "/input/",
"-workingPath", workingDir.toString() + "/working/",
"-datasourceMapPath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(),
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-resultType", "dataset"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Assertions.assertEquals(2, sc
.textFile(workingDir.toString() + "/working/datasource")
.map(item -> OBJECT_MAPPER.readValue(item, DatasourceMaster.class)).count());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/input/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
Assertions
.assertEquals(
2,
tmp
.filter(
s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
Assertions
.assertEquals(
1,
tmp
.filter(
d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea")
&&
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
Assertions
.assertEquals(
1,
tmp
.filter(
d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1")
&&
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
Assertions
.assertEquals(
0,
tmp
.filter(
d -> d.getId().equals("50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7")
&&
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
Assertions
.assertEquals(
0,
tmp
.filter(
d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb")
&&
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
}
@Test
void EoscContextTagTestEmptyDatasource() throws Exception {
spark spark
.read() .read()
@ -89,22 +187,37 @@ public class EOSCContextTaggingTest {
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir.toString() + "/input/dataset"); .json(workingDir.toString() + "/input/dataset");
spark
.read()
.textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/datasource/datasource").getPath())
.map(
(MapFunction<String, Datasource>) value -> OBJECT_MAPPER.readValue(value, Datasource.class),
Encoders.bean(Datasource.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir.toString() + "/input/datasource");
SparkEoscBulkTag SparkEoscBulkTag
.main( .main(
new String[] { new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", "-sourcePath",
workingDir.toString() + "/input/dataset", workingDir.toString() + "/input/",
"-workingPath", workingDir.toString() + "/working/dataset", "-workingPath", workingDir.toString() + "/working/",
"-datasourceMapPath", "-datasourceMapPath",
getClass() getClass()
.getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster") .getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster")
.getPath(), .getPath(),
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset" "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-resultType", "dataset"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/input/dataset") .textFile(workingDir.toString() + "/input/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
@ -113,50 +226,13 @@ public class EOSCContextTaggingTest {
Assertions Assertions
.assertEquals( .assertEquals(
3, 0,
tmp tmp
.filter( .filter(
s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count()); .count());
Assertions
.assertEquals(
1,
tmp
.filter(
d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea")
&&
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
Assertions
.assertEquals(
1,
tmp
.filter(
d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1")
&&
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
Assertions
.assertEquals(
1,
tmp
.filter(
d -> d.getId().equals("50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7")
&&
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
Assertions
.assertEquals(
0,
tmp
.filter(
d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb")
&&
d.getContext().stream().anyMatch(c -> c.getId().equals("eosc")))
.count());
} }
} }

File diff suppressed because one or more lines are too long