diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index 4c6d0653eb..67966d523a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -7,7 +7,6 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import eu.dnetlib.dhp.collection.plugin.zenodo.CollectZenodoDumpCollectorPlugin; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -29,6 +28,7 @@ import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.osf.OsfPreprintsCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.rest.RestCollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.zenodo.CollectZenodoDumpCollectorPlugin; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.HttpClientParams; diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala index c72b366a0f..ea2177497a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala @@ -503,7 +503,6 @@ case object Crossref2Oaf { ) } - if (doi.startsWith("10.3410") || doi.startsWith("10.12703")) instance.setHostedby( OafMapperUtils.keyValue(OafMapperUtils.createOpenaireId(10, "openaire____::H1Connect", true), "H1Connect") @@ -556,14 +555,18 @@ case object Crossref2Oaf { result } - - def generateIdentifier(oaf: Result, doi: String): String = { val id = DHPUtils.md5(doi.toLowerCase) s"50|doiboost____|$id" } - private def generateAuthor(given: String, family: String, orcid: String, index: Int, affiliation: Option[List[mappingAffiliation]]): Author = { + private def generateAuthor( + given: String, + family: String, + orcid: String, + index: Int, + affiliation: Option[List[mappingAffiliation]] + ): Author = { val a = new Author a.setName(given) a.setSurname(family) @@ -700,7 +703,6 @@ case object Crossref2Oaf { if (objectType == null) return resultList - // If the item has a relations is-review-of, then we force it to a peer-review val is_review = json \ "relation" \ "is-review-of" \ "id" var force_to_review = false @@ -713,7 +715,6 @@ case object Crossref2Oaf { if (typology == null) return List() - val result = generateItemFromType(typology._2) if (result == null) return List() diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala index 12ca14ba14..ebe247d8aa 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/crossref/CrossrefMappingTest.scala @@ -28,17 +28,21 @@ class CrossrefMappingTest extends AbstractVocabularyTest { val input = IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/collection/crossref/issn_pub.json"), "utf-8") - Crossref2Oaf.convert(input, vocabularies, TransformationType.All).foreach(record => { - Assertions.assertNotNull(record) - }) + Crossref2Oaf + .convert(input, vocabularies, TransformationType.All) + .foreach(record => { + Assertions.assertNotNull(record) + }) } - @Test def mappingAffiliation(): Unit = { val input = - IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/collection/crossref/affiliationTest.json"), "utf-8") + IOUtils.toString( + getClass.getResourceAsStream("/eu/dnetlib/dhp/collection/crossref/affiliationTest.json"), + "utf-8" + ) val data = Crossref2Oaf.convert(input, vocabularies, TransformationType.OnlyResult) data.foreach(record => { Assertions.assertNotNull(record) @@ -46,10 +50,10 @@ class CrossrefMappingTest extends AbstractVocabularyTest { val publication = record.asInstanceOf[Publication] publication.getAuthor.asScala.foreach(author => { Assertions.assertNotNull(author.getRawAffiliationString) - Assertions.assertTrue(author.getRawAffiliationString.size()>0) + Assertions.assertTrue(author.getRawAffiliationString.size() > 0) - }) }) + }) println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(data.head)) } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java index 764390442d..ecb7cc8272 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java @@ -1,16 +1,16 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; +import static java.lang.String.join; + import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; -import static java.lang.String.join; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; @@ -22,6 +22,7 @@ import com.google.gson.Gson; import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -37,8 +38,7 @@ public class PrepareResultCommunitySetStep1 { * relation */ // TODO - private static final String RESULT_CONTEXT_QUERY_TEMPLATE = - "select target resultId, community_context " + private static final String RESULT_CONTEXT_QUERY_TEMPLATE = "select target resultId, community_context " + "from (select id, collect_set(co.id) community_context " + " from result " + " lateral view explode (context) c as co " @@ -60,26 +60,26 @@ public class PrepareResultCommunitySetStep1 { + "where length(co) > 0 " + "group by resultId"; - private static final String RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO = - "select target as resultId, community_context " + - "from resultWithContext rwc " + - "join relatedToRelations r " + - "join patents p " + - "on rwc.id = r.source and r.target = p.id"; + private static final String RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO = "select target as resultId, community_context " + + + "from resultWithContext rwc " + + "join relatedToRelations r " + + "join patents p " + + "on rwc.id = r.source and r.target = p.id"; private static final String RESULT_WITH_CONTEXT = "select id, collect_set(co.id) community_context \n" + - " from result " + - " lateral view explode (context) c as co " + - " where lower(co.id) IN %s" + - " group by id"; + " from result " + + " lateral view explode (context) c as co " + + " where lower(co.id) IN %s" + + " group by id"; private static final String RESULT_PATENT = "select id " + - " from result " + - " where array_contains(instance.instancetype.classname, 'Patent')"; + " from result " + + " where array_contains(instance.instancetype.classname, 'Patent')"; private static final String IS_RELATED_TO_RELATIONS = "select source, target " + - " from relation " + - " where lower(relClass) = 'isrelatedto' and datainfo.deletedbyinference = false"; + " from relation " + + " where lower(relClass) = 'isrelatedto' and datainfo.deletedbyinference = false"; public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -107,17 +107,25 @@ public class PrepareResultCommunitySetStep1 { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final String allowedsemrel ="(" + join(",", - Arrays.asList(parser.get("allowedsemrels").split(";")).stream().map(value -> "'" + value.toLowerCase() + "'") - .toArray(String[]::new)) + ")"; + final String allowedsemrel = "(" + join( + ",", + Arrays + .asList(parser.get("allowedsemrels").split(";")) + .stream() + .map(value -> "'" + value.toLowerCase() + "'") + .toArray(String[]::new)) + + ")"; log.info("allowedSemRel: {}", allowedsemrel); final String baseURL = parser.get("baseURL"); log.info("baseURL: {}", baseURL); - final String communityIdList = "(" + join(",", getCommunityList(baseURL).stream() + final String communityIdList = "(" + join( + ",", getCommunityList(baseURL) + .stream() .map(value -> "'" + value.toLowerCase() + "'") - .toArray(String[]::new)) + ")"; + .toArray(String[]::new)) + + ")"; final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); log.info("resultType: {}", resultType); @@ -161,18 +169,17 @@ public class PrepareResultCommunitySetStep1 { relation.createOrReplaceTempView("relation"); Dataset result = readPath(spark, inputResultPath, resultClazz) - .where("datainfo.deletedbyinference != true AND datainfo.invisible != true"); + .where("datainfo.deletedbyinference != true AND datainfo.invisible != true"); result.createOrReplaceTempView("result"); final String outputResultPath = outputPath + "/" + resultType; log.info("writing output results to: {}", outputResultPath); - String resultContextQuery = String .format( RESULT_CONTEXT_QUERY_TEMPLATE, - "AND lower(co.id) IN " + communityIdList, - "AND lower(relClass) IN " + allowedsemrel); + "AND lower(co.id) IN " + communityIdList, + "AND lower(relClass) IN " + allowedsemrel); Dataset result_context = spark.sql(resultContextQuery); Dataset rwc = spark.sql(String.format(RESULT_WITH_CONTEXT, communityIdList)); @@ -183,18 +190,17 @@ public class PrepareResultCommunitySetStep1 { patents.createOrReplaceTempView("patents"); relatedToRelations.createOrReplaceTempView("relatedTorelations"); - - result_context = result_context.unionAll( spark.sql(RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO)); + result_context = result_context.unionAll(spark.sql(RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO)); result_context.createOrReplaceTempView("result_context"); spark - .sql(RESULT_COMMUNITY_LIST_QUERY) - .as(Encoders.bean(ResultCommunityList.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Append) - .json(outputResultPath); + .sql(RESULT_COMMUNITY_LIST_QUERY) + .as(Encoders.bean(ResultCommunityList.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(outputResultPath); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java index 9bebc36e58..9801b1bf6e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep2.java @@ -77,7 +77,7 @@ public class PrepareResultCommunitySetStep2 { if (b == null) { return a; } - Set community_set = new HashSet<>(a.getCommunityList()); + Set community_set = new HashSet<>(a.getCommunityList()); community_set.addAll(b.getCommunityList()); a.setCommunityList(new ArrayList<>(community_set)); return a; diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java index c1fcff4d99..2b52c91de4 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java @@ -10,7 +10,6 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.schema.oaf.Dataset; import scala.collection.Seq; @@ -279,53 +279,55 @@ public class ResultToCommunityJobTest { @Test public void prepareStep1Test() throws Exception { /* - - - final String allowedsemrel = join(",", Arrays.stream(parser.get("allowedsemrels").split(";")) - .map(value -> "'" + value.toLowerCase() + "'") - .toArray(String[]::new)); - - log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); - - final String baseURL = parser.get("baseURL"); - log.info("baseURL: {}", baseURL); + * final String allowedsemrel = join(",", Arrays.stream(parser.get("allowedsemrels").split(";")) .map(value -> + * "'" + value.toLowerCase() + "'") .toArray(String[]::new)); log.info("allowedSemRel: {}", new + * Gson().toJson(allowedsemrel)); final String baseURL = parser.get("baseURL"); log.info("baseURL: {}", + * baseURL); */ PrepareResultCommunitySetStep1 - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", getClass() - .getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/graph") - .getPath(), - "-hive_metastore_uris", "", - "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", - "-outputPath", workingDir.toString() + "/preparedInfo", - "-allowedsemrels","issupplementto;issupplementedby", - "-baseURL","https://dev-openaire.d4science.org/openaire/community/" - }); + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", getClass() + .getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/graph") + .getPath(), + "-hive_metastore_uris", "", + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", workingDir.toString() + "/preparedInfo", + "-allowedsemrels", "issupplementto;issupplementedby", + "-baseURL", "https://dev-openaire.d4science.org/openaire/community/" + }); - - org.apache.spark.sql.Dataset resultCommunityList = spark.read().schema(Encoders.bean(ResultCommunityList.class).schema()) - .json(workingDir.toString() + "/preparedInfo/publication") - .as(Encoders.bean(ResultCommunityList.class)); + org.apache.spark.sql.Dataset resultCommunityList = spark + .read() + .schema(Encoders.bean(ResultCommunityList.class).schema()) + .json(workingDir.toString() + "/preparedInfo/publication") + .as(Encoders.bean(ResultCommunityList.class)); Assertions.assertEquals(2, resultCommunityList.count()); - Assertions.assertEquals(1,resultCommunityList.filter("resultId = '50|dedup_wf_001::06e51d2bf295531b2d2e7a1b55500783'").count()); - Assertions.assertEquals(1,resultCommunityList.filter("resultId = '50|pending_org_::82f63b2d21ae88596b9d8991780e9888'").count()); + Assertions + .assertEquals( + 1, + resultCommunityList.filter("resultId = '50|dedup_wf_001::06e51d2bf295531b2d2e7a1b55500783'").count()); + Assertions + .assertEquals( + 1, + resultCommunityList.filter("resultId = '50|pending_org_::82f63b2d21ae88596b9d8991780e9888'").count()); ArrayList communities = resultCommunityList - .filter("resultId = '50|dedup_wf_001::06e51d2bf295531b2d2e7a1b55500783'") - .first().getCommunityList(); + .filter("resultId = '50|dedup_wf_001::06e51d2bf295531b2d2e7a1b55500783'") + .first() + .getCommunityList(); Assertions.assertEquals(2, communities.size()); Assertions.assertTrue(communities.stream().anyMatch(cid -> "beopen".equals(cid))); Assertions.assertTrue(communities.stream().anyMatch(cid -> "dh-ch".equals(cid))); communities = resultCommunityList - .filter("resultId = '50|pending_org_::82f63b2d21ae88596b9d8991780e9888'") - .first().getCommunityList(); + .filter("resultId = '50|pending_org_::82f63b2d21ae88596b9d8991780e9888'") + .first() + .getCommunityList(); Assertions.assertEquals(1, communities.size()); Assertions.assertEquals("dh-ch", communities.get(0)); } - }