diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 10e9f6e0e..f78a4de55 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -4,10 +4,23 @@ package eu.dnetlib.dhp.bulktag; import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; +import org.apache.avro.TestAnnotation; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; @@ -18,8 +31,10 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; +import com.sun.media.sound.ModelInstrumentComparator; import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.api.model.CommunityEntityMap; @@ -73,8 +88,20 @@ public class SparkBulkTagJob { log.info("baseURL: {}", baseURL); log.info("pathMap: {}", parser.get("pathMap")); - ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap") + "}}", ProtoMap.class); - log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); + String protoMappingPath = parser.get("pathMap"); + // log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + Configuration configuration = new Configuration(); + configuration.set("fs.defaultFS", hdfsNameNode); + FileSystem fs = FileSystem.get(configuration); + + String temp = IOUtils.toString(fs.open(new Path(protoMappingPath)), StandardCharsets.UTF_8); + log.info("protoMap: {}", temp); + ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class); + log.info("pathMap: {}", new Gson().toJson(protoMap)); SparkConf conf = new SparkConf(); CommunityConfiguration cc; @@ -96,7 +123,8 @@ public class SparkBulkTagJob { isSparkSessionManaged, spark -> { extendCommunityConfigurationForEOSC(spark, inputPath, cc); - execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc); + execBulkTag( + spark, inputPath, outputPath, protoMap, cc); execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL)); execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL)); }); @@ -256,6 +284,11 @@ public class SparkBulkTagJob { ProtoMap protoMappingParams, CommunityConfiguration communityConfiguration) { + try { + System.out.println(new ObjectMapper().writeValueAsString(protoMappingParams)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } ModelSupport.entityTypes .keySet() .parallelStream() diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java index dc75aec37..41edbcec6 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java @@ -7,8 +7,8 @@ import java.util.HashMap; import eu.dnetlib.dhp.bulktag.actions.MapModel; public class ProtoMap extends HashMap implements Serializable { - public ProtoMap() { super(); } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index f252f6463..52c2cafce 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -1,4 +1,4 @@ -sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched +sourcePath=/tmp/beta_provision/graph/09_graph_orcid_enriched resumeFrom=ResultProject allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json index f8fe91223..36c9600fe 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json @@ -34,5 +34,10 @@ "paramLongName": "baseURL", "paramDescription": "this parameter is to specify the api to be queried (beta or production)", "paramRequired": true - } + },{ + "paramName": "nn", + "paramLongName": "nameNode", + "paramDescription": "this parameter is to specify the api to be queried (beta or production)", + "paramRequired": true +} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml index 2695253e6..c1675239c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml @@ -53,10 +53,10 @@ memoryOverhead - 3G + 4G partitions - 3284 + 15000 \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml index c7a9e8a26..c4b4b7d64 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml @@ -76,6 +76,7 @@ --outputPath${workingDir}/bulktag/ --pathMap${pathMap} --baseURL${baseURL} + --nameNode${nameNode} diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 25ed68e03..4d563866b 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -6,14 +6,19 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -25,14 +30,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.bulktag.community.ProtoMap; import eu.dnetlib.dhp.schema.oaf.*; public class BulkTagJobTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final String pathMap = "{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," + + public static final String pathMap = "{\"protoMap\":{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," + " \"title\":{\"path\":\"$['title'][*]['value']\"}, " + " \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " + " \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ," @@ -51,7 +58,7 @@ public class BulkTagJobTest { "\"method\":\"execSubstring\"," + "\"params\":[" + "{\"paramName\":\"From\", \"paramValue\":0}, " + - "{\"paramName\":\"To\",\"paramValue\":4}]}}}"; + "{\"paramName\":\"To\",\"paramValue\":4}]}}}}"; private static SparkSession spark; @@ -231,6 +238,14 @@ public class BulkTagJobTest { @Test void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception { + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/bulktag/pathMap/") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap")); + final String sourcePath = getClass() .getResource( "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/") @@ -246,7 +261,8 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap", + "-nameNode", "local" }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -316,6 +332,7 @@ public class BulkTagJobTest { final String sourcePath = getClass() .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") .getPath(); + SparkBulkTagJob .main( new String[] { @@ -390,6 +407,13 @@ public class BulkTagJobTest { final String sourcePath = getClass() .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") .getPath(); + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/bulktag/pathMap/") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap")); SparkBulkTagJob .main( new String[] { @@ -400,7 +424,9 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", "-baseURL", "https://services.openaire.eu/openaire/community/", - "-pathMap", pathMap + + "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1757,4 +1783,40 @@ public class BulkTagJobTest { } + @Test + public void prova() throws Exception { + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/bulktag/pathMap/") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap")); + + final String sourcePath = getClass() + .getResource( + "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/") + .getPath(); + + ProtoMap prova = new Gson() + .fromJson( + "{\"author\":{\"path\":\"$['author'][]['fullname']\"},\"title\":{\"path\":\"$['title'][]['value']\"},\"orcid\":{\"path\":\"$['author'][]['pid'][][?(@['qualifier']['classid']=='orcid')]['value']\"},\"orcid_pending\":{\"path\":\"$['author'][]['pid'][][?(@['qualifier']['classid']=='orcid_pending')]['value']\"},\"contributor\":{\"path\":\"$['contributor'][]['value']\"},\"description\":{\"path\":\"$['description'][]['value']\"},\"subject\":{\"path\":\"$['subject'][]['value']\"},\"fos\":{\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"},\"sdg\":{\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"},\"journal\":{\"path\":\"$['journal'].name\"},\"hostedby\":{\"path\":\"$['instance'][]['hostedby']['key']\"},\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"},\"publisher\":{\"path\":\"$['publisher'].value\"},\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\",\"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\",\"method\":\"execSubstring\",\"params\":[{\"paramName\":\"From\",\"paramValue\":0},{\"paramName\":\"To\",\"paramValue\":4}]}}}", + ProtoMap.class); + SparkBulkTagJob + .main( + new String[] { + + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + + "-outputPath", workingDir.toString() + "/", + + "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", + "-baseURL", "none", + "-nameNode", "local" + }); + + } + } diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/pathMap/pathMap b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/pathMap/pathMap new file mode 100644 index 000000000..e7bbfe941 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/pathMap/pathMap @@ -0,0 +1,58 @@ +{ + "author":{ + "path":"$['author'][*]['fullname']" + }, + "title":{ + "path":"$['title'][*]['value']" + }, + "orcid":{ + "path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" + }, + "orcid_pending":{ + "path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']" + }, + "contributor":{ + "path":"$['contributor'][*]['value']" + }, + "description":{ + "path":"$['description'][*]['value']" + }, + "subject":{ + "path":"$['subject'][*]['value']" + }, + "fos":{ + "path":"$['subject'][?(@['qualifier']['classid']=='FOS')].value" + }, + "sdg":{ + "path":"$['subject'][?(@['qualifier']['classid']=='SDG')].value" + }, + "journal":{ + "path":"$['journal'].name" + }, + "hostedby":{ + "path":"$['instance'][*]['hostedby']['key']" + }, + "collectedfrom":{ + "path":"$['instance'][*]['collectedfrom']['key']" + }, + "publisher":{ + "path":"$['publisher'].value" + }, + "publicationyear":{ + "path":"$['dateofacceptance'].value", + "action":{ + "clazz":"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction", + "method":"execSubstring", + "params":[ + { + "paramName":"From", + "paramValue":0 + }, + { + "paramName":"To", + "paramValue":4 + } + ] + } + } +} \ No newline at end of file