[Tagging Projects and Datasource] changed the way the pathMap parameter is passed. It was too long and was truncated

This commit is contained in:
Miriam Baglioni 2024-02-19 16:12:59 +01:00
parent 8dae10b442
commit 43da7e1191
8 changed files with 171 additions and 12 deletions

View File

@ -4,10 +4,23 @@ package eu.dnetlib.dhp.bulktag;
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import org.apache.avro.TestAnnotation;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -18,8 +31,10 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.sun.media.sound.ModelInstrumentComparator;
import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap; import eu.dnetlib.dhp.api.model.CommunityEntityMap;
@ -73,8 +88,20 @@ public class SparkBulkTagJob {
log.info("baseURL: {}", baseURL); log.info("baseURL: {}", baseURL);
log.info("pathMap: {}", parser.get("pathMap")); log.info("pathMap: {}", parser.get("pathMap"));
ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap") + "}}", ProtoMap.class); String protoMappingPath = parser.get("pathMap");
log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); // log.info("pathMap: {}", new Gson().toJson(protoMappingParams));
final String hdfsNameNode = parser.get("nameNode");
log.info("nameNode: {}", hdfsNameNode);
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsNameNode);
FileSystem fs = FileSystem.get(configuration);
String temp = IOUtils.toString(fs.open(new Path(protoMappingPath)), StandardCharsets.UTF_8);
log.info("protoMap: {}", temp);
ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class);
log.info("pathMap: {}", new Gson().toJson(protoMap));
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
CommunityConfiguration cc; CommunityConfiguration cc;
@ -96,7 +123,8 @@ public class SparkBulkTagJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
extendCommunityConfigurationForEOSC(spark, inputPath, cc); extendCommunityConfigurationForEOSC(spark, inputPath, cc);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc); execBulkTag(
spark, inputPath, outputPath, protoMap, cc);
execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL)); execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL));
execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL)); execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL));
}); });
@ -256,6 +284,11 @@ public class SparkBulkTagJob {
ProtoMap protoMappingParams, ProtoMap protoMappingParams,
CommunityConfiguration communityConfiguration) { CommunityConfiguration communityConfiguration) {
try {
System.out.println(new ObjectMapper().writeValueAsString(protoMappingParams));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
ModelSupport.entityTypes ModelSupport.entityTypes
.keySet() .keySet()
.parallelStream() .parallelStream()

View File

@ -7,8 +7,8 @@ import java.util.HashMap;
import eu.dnetlib.dhp.bulktag.actions.MapModel; import eu.dnetlib.dhp.bulktag.actions.MapModel;
public class ProtoMap extends HashMap<String, MapModel> implements Serializable { public class ProtoMap extends HashMap<String, MapModel> implements Serializable {
public ProtoMap() { public ProtoMap() {
super(); super();
} }
} }

View File

@ -1,4 +1,4 @@
sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched sourcePath=/tmp/beta_provision/graph/09_graph_orcid_enriched
resumeFrom=ResultProject resumeFrom=ResultProject
allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo
allowedsemrelsresultproject=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo

View File

@ -34,5 +34,10 @@
"paramLongName": "baseURL", "paramLongName": "baseURL",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)", "paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": true "paramRequired": true
} },{
"paramName": "nn",
"paramLongName": "nameNode",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": true
}
] ]

View File

@ -53,10 +53,10 @@
</property> </property>
<property> <property>
<name>memoryOverhead</name> <name>memoryOverhead</name>
<value>3G</value> <value>4G</value>
</property> </property>
<property> <property>
<name>partitions</name> <name>partitions</name>
<value>3284</value> <value>15000</value>
</property> </property>
</configuration> </configuration>

View File

@ -76,6 +76,7 @@
<arg>--outputPath</arg><arg>${workingDir}/bulktag/</arg> <arg>--outputPath</arg><arg>${workingDir}/bulktag/</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg> <arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--baseURL</arg><arg>${baseURL}</arg> <arg>--baseURL</arg><arg>${baseURL}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -6,14 +6,19 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -25,14 +30,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.bulktag.community.ProtoMap;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
public class BulkTagJobTest { public class BulkTagJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String pathMap = "{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," + public static final String pathMap = "{\"protoMap\":{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," +
" \"title\":{\"path\":\"$['title'][*]['value']\"}, " + " \"title\":{\"path\":\"$['title'][*]['value']\"}, " +
" \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " + " \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " +
" \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ," " \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ,"
@ -51,7 +58,7 @@ public class BulkTagJobTest {
"\"method\":\"execSubstring\"," + "\"method\":\"execSubstring\"," +
"\"params\":[" + "\"params\":[" +
"{\"paramName\":\"From\", \"paramValue\":0}, " + "{\"paramName\":\"From\", \"paramValue\":0}, " +
"{\"paramName\":\"To\",\"paramValue\":4}]}}}"; "{\"paramName\":\"To\",\"paramValue\":4}]}}}}";
private static SparkSession spark; private static SparkSession spark;
@ -231,6 +238,14 @@ public class BulkTagJobTest {
@Test @Test
void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception { void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception {
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/bulktag/pathMap/")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap"));
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/") "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/")
@ -246,7 +261,8 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/", "-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap",
"-nameNode", "local"
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -316,6 +332,7 @@ public class BulkTagJobTest {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
@ -390,6 +407,13 @@ public class BulkTagJobTest {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/")
.getPath(); .getPath();
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/bulktag/pathMap/")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap"));
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
@ -400,7 +424,9 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/", "-outputPath", workingDir.toString() + "/",
"-baseURL", "https://services.openaire.eu/openaire/community/", "-baseURL", "https://services.openaire.eu/openaire/community/",
"-pathMap", pathMap
"-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap",
"-nameNode", "local"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1757,4 +1783,40 @@ public class BulkTagJobTest {
} }
@Test
public void prova() throws Exception {
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/bulktag/pathMap/")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap"));
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/")
.getPath();
ProtoMap prova = new Gson()
.fromJson(
"{\"author\":{\"path\":\"$['author'][]['fullname']\"},\"title\":{\"path\":\"$['title'][]['value']\"},\"orcid\":{\"path\":\"$['author'][]['pid'][][?(@['qualifier']['classid']=='orcid')]['value']\"},\"orcid_pending\":{\"path\":\"$['author'][]['pid'][][?(@['qualifier']['classid']=='orcid_pending')]['value']\"},\"contributor\":{\"path\":\"$['contributor'][]['value']\"},\"description\":{\"path\":\"$['description'][]['value']\"},\"subject\":{\"path\":\"$['subject'][]['value']\"},\"fos\":{\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"},\"sdg\":{\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"},\"journal\":{\"path\":\"$['journal'].name\"},\"hostedby\":{\"path\":\"$['instance'][]['hostedby']['key']\"},\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"},\"publisher\":{\"path\":\"$['publisher'].value\"},\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\",\"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\",\"method\":\"execSubstring\",\"params\":[{\"paramName\":\"From\",\"paramValue\":0},{\"paramName\":\"To\",\"paramValue\":4}]}}}",
ProtoMap.class);
SparkBulkTagJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-outputPath", workingDir.toString() + "/",
"-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap",
"-baseURL", "none",
"-nameNode", "local"
});
}
} }

View File

@ -0,0 +1,58 @@
{
"author":{
"path":"$['author'][*]['fullname']"
},
"title":{
"path":"$['title'][*]['value']"
},
"orcid":{
"path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']"
},
"orcid_pending":{
"path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']"
},
"contributor":{
"path":"$['contributor'][*]['value']"
},
"description":{
"path":"$['description'][*]['value']"
},
"subject":{
"path":"$['subject'][*]['value']"
},
"fos":{
"path":"$['subject'][?(@['qualifier']['classid']=='FOS')].value"
},
"sdg":{
"path":"$['subject'][?(@['qualifier']['classid']=='SDG')].value"
},
"journal":{
"path":"$['journal'].name"
},
"hostedby":{
"path":"$['instance'][*]['hostedby']['key']"
},
"collectedfrom":{
"path":"$['instance'][*]['collectedfrom']['key']"
},
"publisher":{
"path":"$['publisher'].value"
},
"publicationyear":{
"path":"$['dateofacceptance'].value",
"action":{
"clazz":"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction",
"method":"execSubstring",
"params":[
{
"paramName":"From",
"paramValue":0
},
{
"paramName":"To",
"paramValue":4
}
]
}
}
}