forked from D-Net/dnet-hadoop
code formatting
This commit is contained in:
parent
5c7f7fb3b8
commit
e4b814b3f1
|
@ -7,7 +7,6 @@ import java.io.IOException;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import eu.dnetlib.dhp.collection.plugin.zenodo.CollectZenodoDumpCollectorPlugin;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
|
@ -29,6 +28,7 @@ import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
|
|||
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.osf.OsfPreprintsCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.rest.RestCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.zenodo.CollectZenodoDumpCollectorPlugin;
|
||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||
|
|
|
@ -503,7 +503,6 @@ case object Crossref2Oaf {
|
|||
)
|
||||
}
|
||||
|
||||
|
||||
if (doi.startsWith("10.3410") || doi.startsWith("10.12703"))
|
||||
instance.setHostedby(
|
||||
OafMapperUtils.keyValue(OafMapperUtils.createOpenaireId(10, "openaire____::H1Connect", true), "H1Connect")
|
||||
|
@ -556,14 +555,18 @@ case object Crossref2Oaf {
|
|||
result
|
||||
}
|
||||
|
||||
|
||||
|
||||
def generateIdentifier(oaf: Result, doi: String): String = {
|
||||
val id = DHPUtils.md5(doi.toLowerCase)
|
||||
s"50|doiboost____|$id"
|
||||
}
|
||||
|
||||
private def generateAuthor(given: String, family: String, orcid: String, index: Int, affiliation: Option[List[mappingAffiliation]]): Author = {
|
||||
private def generateAuthor(
|
||||
given: String,
|
||||
family: String,
|
||||
orcid: String,
|
||||
index: Int,
|
||||
affiliation: Option[List[mappingAffiliation]]
|
||||
): Author = {
|
||||
val a = new Author
|
||||
a.setName(given)
|
||||
a.setSurname(family)
|
||||
|
@ -700,7 +703,6 @@ case object Crossref2Oaf {
|
|||
if (objectType == null)
|
||||
return resultList
|
||||
|
||||
|
||||
// If the item has a relations is-review-of, then we force it to a peer-review
|
||||
val is_review = json \ "relation" \ "is-review-of" \ "id"
|
||||
var force_to_review = false
|
||||
|
@ -713,7 +715,6 @@ case object Crossref2Oaf {
|
|||
if (typology == null)
|
||||
return List()
|
||||
|
||||
|
||||
val result = generateItemFromType(typology._2)
|
||||
if (result == null)
|
||||
return List()
|
||||
|
|
|
@ -28,17 +28,21 @@ class CrossrefMappingTest extends AbstractVocabularyTest {
|
|||
val input =
|
||||
IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/collection/crossref/issn_pub.json"), "utf-8")
|
||||
|
||||
Crossref2Oaf.convert(input, vocabularies, TransformationType.All).foreach(record => {
|
||||
Assertions.assertNotNull(record)
|
||||
})
|
||||
Crossref2Oaf
|
||||
.convert(input, vocabularies, TransformationType.All)
|
||||
.foreach(record => {
|
||||
Assertions.assertNotNull(record)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def mappingAffiliation(): Unit = {
|
||||
val input =
|
||||
IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/collection/crossref/affiliationTest.json"), "utf-8")
|
||||
IOUtils.toString(
|
||||
getClass.getResourceAsStream("/eu/dnetlib/dhp/collection/crossref/affiliationTest.json"),
|
||||
"utf-8"
|
||||
)
|
||||
val data = Crossref2Oaf.convert(input, vocabularies, TransformationType.OnlyResult)
|
||||
data.foreach(record => {
|
||||
Assertions.assertNotNull(record)
|
||||
|
@ -46,10 +50,10 @@ class CrossrefMappingTest extends AbstractVocabularyTest {
|
|||
val publication = record.asInstanceOf[Publication]
|
||||
publication.getAuthor.asScala.foreach(author => {
|
||||
Assertions.assertNotNull(author.getRawAffiliationString)
|
||||
Assertions.assertTrue(author.getRawAffiliationString.size()>0)
|
||||
Assertions.assertTrue(author.getRawAffiliationString.size() > 0)
|
||||
|
||||
})
|
||||
})
|
||||
})
|
||||
println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(data.head))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
|
||||
package eu.dnetlib.dhp.resulttocommunityfromsemrel;
|
||||
|
||||
import static java.lang.String.join;
|
||||
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||
import static java.lang.String.join;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.*;
|
||||
|
@ -22,6 +22,7 @@ import com.google.gson.Gson;
|
|||
import eu.dnetlib.dhp.api.Utils;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
|
@ -37,8 +38,7 @@ public class PrepareResultCommunitySetStep1 {
|
|||
* relation
|
||||
*/
|
||||
// TODO
|
||||
private static final String RESULT_CONTEXT_QUERY_TEMPLATE =
|
||||
"select target resultId, community_context "
|
||||
private static final String RESULT_CONTEXT_QUERY_TEMPLATE = "select target resultId, community_context "
|
||||
+ "from (select id, collect_set(co.id) community_context "
|
||||
+ " from result "
|
||||
+ " lateral view explode (context) c as co "
|
||||
|
@ -60,26 +60,26 @@ public class PrepareResultCommunitySetStep1 {
|
|||
+ "where length(co) > 0 "
|
||||
+ "group by resultId";
|
||||
|
||||
private static final String RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO =
|
||||
"select target as resultId, community_context " +
|
||||
"from resultWithContext rwc " +
|
||||
"join relatedToRelations r " +
|
||||
"join patents p " +
|
||||
"on rwc.id = r.source and r.target = p.id";
|
||||
private static final String RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO = "select target as resultId, community_context "
|
||||
+
|
||||
"from resultWithContext rwc " +
|
||||
"join relatedToRelations r " +
|
||||
"join patents p " +
|
||||
"on rwc.id = r.source and r.target = p.id";
|
||||
|
||||
private static final String RESULT_WITH_CONTEXT = "select id, collect_set(co.id) community_context \n" +
|
||||
" from result " +
|
||||
" lateral view explode (context) c as co " +
|
||||
" where lower(co.id) IN %s" +
|
||||
" group by id";
|
||||
" from result " +
|
||||
" lateral view explode (context) c as co " +
|
||||
" where lower(co.id) IN %s" +
|
||||
" group by id";
|
||||
|
||||
private static final String RESULT_PATENT = "select id " +
|
||||
" from result " +
|
||||
" where array_contains(instance.instancetype.classname, 'Patent')";
|
||||
" from result " +
|
||||
" where array_contains(instance.instancetype.classname, 'Patent')";
|
||||
|
||||
private static final String IS_RELATED_TO_RELATIONS = "select source, target " +
|
||||
" from relation " +
|
||||
" where lower(relClass) = 'isrelatedto' and datainfo.deletedbyinference = false";
|
||||
" from relation " +
|
||||
" where lower(relClass) = 'isrelatedto' and datainfo.deletedbyinference = false";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
|
@ -107,17 +107,25 @@ public class PrepareResultCommunitySetStep1 {
|
|||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||
|
||||
final String allowedsemrel ="(" + join(",",
|
||||
Arrays.asList(parser.get("allowedsemrels").split(";")).stream().map(value -> "'" + value.toLowerCase() + "'")
|
||||
.toArray(String[]::new)) + ")";
|
||||
final String allowedsemrel = "(" + join(
|
||||
",",
|
||||
Arrays
|
||||
.asList(parser.get("allowedsemrels").split(";"))
|
||||
.stream()
|
||||
.map(value -> "'" + value.toLowerCase() + "'")
|
||||
.toArray(String[]::new))
|
||||
+ ")";
|
||||
log.info("allowedSemRel: {}", allowedsemrel);
|
||||
|
||||
final String baseURL = parser.get("baseURL");
|
||||
log.info("baseURL: {}", baseURL);
|
||||
|
||||
final String communityIdList = "(" + join(",", getCommunityList(baseURL).stream()
|
||||
final String communityIdList = "(" + join(
|
||||
",", getCommunityList(baseURL)
|
||||
.stream()
|
||||
.map(value -> "'" + value.toLowerCase() + "'")
|
||||
.toArray(String[]::new)) + ")";
|
||||
.toArray(String[]::new))
|
||||
+ ")";
|
||||
|
||||
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
||||
log.info("resultType: {}", resultType);
|
||||
|
@ -161,18 +169,17 @@ public class PrepareResultCommunitySetStep1 {
|
|||
relation.createOrReplaceTempView("relation");
|
||||
|
||||
Dataset<R> result = readPath(spark, inputResultPath, resultClazz)
|
||||
.where("datainfo.deletedbyinference != true AND datainfo.invisible != true");
|
||||
.where("datainfo.deletedbyinference != true AND datainfo.invisible != true");
|
||||
result.createOrReplaceTempView("result");
|
||||
|
||||
final String outputResultPath = outputPath + "/" + resultType;
|
||||
log.info("writing output results to: {}", outputResultPath);
|
||||
|
||||
|
||||
String resultContextQuery = String
|
||||
.format(
|
||||
RESULT_CONTEXT_QUERY_TEMPLATE,
|
||||
"AND lower(co.id) IN " + communityIdList,
|
||||
"AND lower(relClass) IN " + allowedsemrel);
|
||||
"AND lower(co.id) IN " + communityIdList,
|
||||
"AND lower(relClass) IN " + allowedsemrel);
|
||||
Dataset<Row> result_context = spark.sql(resultContextQuery);
|
||||
|
||||
Dataset<Row> rwc = spark.sql(String.format(RESULT_WITH_CONTEXT, communityIdList));
|
||||
|
@ -183,18 +190,17 @@ public class PrepareResultCommunitySetStep1 {
|
|||
patents.createOrReplaceTempView("patents");
|
||||
relatedToRelations.createOrReplaceTempView("relatedTorelations");
|
||||
|
||||
|
||||
result_context = result_context.unionAll( spark.sql(RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO));
|
||||
result_context = result_context.unionAll(spark.sql(RESULT_CONTEXT_QUERY_TEMPLATE_IS_RELATED_TO));
|
||||
|
||||
result_context.createOrReplaceTempView("result_context");
|
||||
|
||||
spark
|
||||
.sql(RESULT_COMMUNITY_LIST_QUERY)
|
||||
.as(Encoders.bean(ResultCommunityList.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.json(outputResultPath);
|
||||
.sql(RESULT_COMMUNITY_LIST_QUERY)
|
||||
.as(Encoders.bean(ResultCommunityList.class))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.json(outputResultPath);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ public class PrepareResultCommunitySetStep2 {
|
|||
if (b == null) {
|
||||
return a;
|
||||
}
|
||||
Set<String> community_set = new HashSet<>(a.getCommunityList());
|
||||
Set<String> community_set = new HashSet<>(a.getCommunityList());
|
||||
community_set.addAll(b.getCommunityList());
|
||||
a.setCommunityList(new ArrayList<>(community_set));
|
||||
return a;
|
||||
|
|
|
@ -10,7 +10,6 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
|
@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import scala.collection.Seq;
|
||||
|
||||
|
@ -279,53 +279,55 @@ public class ResultToCommunityJobTest {
|
|||
@Test
|
||||
public void prepareStep1Test() throws Exception {
|
||||
/*
|
||||
|
||||
|
||||
final String allowedsemrel = join(",", Arrays.stream(parser.get("allowedsemrels").split(";"))
|
||||
.map(value -> "'" + value.toLowerCase() + "'")
|
||||
.toArray(String[]::new));
|
||||
|
||||
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
|
||||
|
||||
final String baseURL = parser.get("baseURL");
|
||||
log.info("baseURL: {}", baseURL);
|
||||
* final String allowedsemrel = join(",", Arrays.stream(parser.get("allowedsemrels").split(";")) .map(value ->
|
||||
* "'" + value.toLowerCase() + "'") .toArray(String[]::new)); log.info("allowedSemRel: {}", new
|
||||
* Gson().toJson(allowedsemrel)); final String baseURL = parser.get("baseURL"); log.info("baseURL: {}",
|
||||
* baseURL);
|
||||
*/
|
||||
PrepareResultCommunitySetStep1
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", getClass()
|
||||
.getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/graph")
|
||||
.getPath(),
|
||||
"-hive_metastore_uris", "",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-outputPath", workingDir.toString() + "/preparedInfo",
|
||||
"-allowedsemrels","issupplementto;issupplementedby",
|
||||
"-baseURL","https://dev-openaire.d4science.org/openaire/community/"
|
||||
});
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath", getClass()
|
||||
.getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/graph")
|
||||
.getPath(),
|
||||
"-hive_metastore_uris", "",
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-outputPath", workingDir.toString() + "/preparedInfo",
|
||||
"-allowedsemrels", "issupplementto;issupplementedby",
|
||||
"-baseURL", "https://dev-openaire.d4science.org/openaire/community/"
|
||||
});
|
||||
|
||||
|
||||
org.apache.spark.sql.Dataset<ResultCommunityList> resultCommunityList = spark.read().schema(Encoders.bean(ResultCommunityList.class).schema())
|
||||
.json(workingDir.toString() + "/preparedInfo/publication")
|
||||
.as(Encoders.bean(ResultCommunityList.class));
|
||||
org.apache.spark.sql.Dataset<ResultCommunityList> resultCommunityList = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(ResultCommunityList.class).schema())
|
||||
.json(workingDir.toString() + "/preparedInfo/publication")
|
||||
.as(Encoders.bean(ResultCommunityList.class));
|
||||
|
||||
Assertions.assertEquals(2, resultCommunityList.count());
|
||||
Assertions.assertEquals(1,resultCommunityList.filter("resultId = '50|dedup_wf_001::06e51d2bf295531b2d2e7a1b55500783'").count());
|
||||
Assertions.assertEquals(1,resultCommunityList.filter("resultId = '50|pending_org_::82f63b2d21ae88596b9d8991780e9888'").count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
resultCommunityList.filter("resultId = '50|dedup_wf_001::06e51d2bf295531b2d2e7a1b55500783'").count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
resultCommunityList.filter("resultId = '50|pending_org_::82f63b2d21ae88596b9d8991780e9888'").count());
|
||||
|
||||
ArrayList<String> communities = resultCommunityList
|
||||
.filter("resultId = '50|dedup_wf_001::06e51d2bf295531b2d2e7a1b55500783'")
|
||||
.first().getCommunityList();
|
||||
.filter("resultId = '50|dedup_wf_001::06e51d2bf295531b2d2e7a1b55500783'")
|
||||
.first()
|
||||
.getCommunityList();
|
||||
Assertions.assertEquals(2, communities.size());
|
||||
Assertions.assertTrue(communities.stream().anyMatch(cid -> "beopen".equals(cid)));
|
||||
Assertions.assertTrue(communities.stream().anyMatch(cid -> "dh-ch".equals(cid)));
|
||||
|
||||
communities = resultCommunityList
|
||||
.filter("resultId = '50|pending_org_::82f63b2d21ae88596b9d8991780e9888'")
|
||||
.first().getCommunityList();
|
||||
.filter("resultId = '50|pending_org_::82f63b2d21ae88596b9d8991780e9888'")
|
||||
.first()
|
||||
.getCommunityList();
|
||||
Assertions.assertEquals(1, communities.size());
|
||||
Assertions.assertEquals("dh-ch", communities.get(0));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue