forked from D-Net/dnet-hadoop
Merge branch 'beta' into horizontalConstraints
This commit is contained in:
commit
b60985cf68
|
@ -0,0 +1,168 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean.country;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.swing.text.html.Option;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 20/07/22
|
||||
*/
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
|
||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class CleanCountrySparkJob implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
CleanContextSparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json"));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
String datasourcePath = parser.get("hostedBy");
|
||||
log.info("datasourcePath: {}", datasourcePath);
|
||||
|
||||
String country = parser.get("country");
|
||||
log.info("country: {}", country);
|
||||
|
||||
String[] verifyParam = parser.get("verifyParam").split(";");
|
||||
log.info("verifyParam: {}", verifyParam);
|
||||
|
||||
String collectedfrom = parser.get("collectedfrom");
|
||||
log.info("collectedfrom: {}", collectedfrom);
|
||||
|
||||
String graphTableClassName = parser.get("graphTableClassName");
|
||||
log.info("graphTableClassName: {}", graphTableClassName);
|
||||
|
||||
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
|
||||
cleanCountry(
|
||||
spark, country, verifyParam, inputPath, entityClazz, workingPath, collectedfrom, datasourcePath);
|
||||
});
|
||||
}
|
||||
|
||||
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String[] verifyParam,
|
||||
String inputPath, Class<T> entityClazz, String workingPath, String collectedfrom, String datasourcePath) {
|
||||
|
||||
List<String> hostedBy = spark
|
||||
.read()
|
||||
.textFile(datasourcePath)
|
||||
.collectAsList();
|
||||
|
||||
Dataset<T> res = spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map(
|
||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||
Encoders.bean(entityClazz));
|
||||
|
||||
res.map((MapFunction<T, T>) r -> {
|
||||
if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) ||
|
||||
!r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) {
|
||||
return r;
|
||||
}
|
||||
|
||||
if (r
|
||||
.getPid()
|
||||
.stream()
|
||||
.anyMatch(p -> p.getQualifier().getClassid()
|
||||
.equals(PidType.doi) && pidInParam(p.getValue(), verifyParam))) {
|
||||
r
|
||||
.setCountry(
|
||||
r
|
||||
.getCountry()
|
||||
.stream()
|
||||
.filter(
|
||||
c -> toTakeCountry(c, country))
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
}
|
||||
|
||||
return r;
|
||||
}, Encoders.bean(entityClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath);
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(workingPath)
|
||||
.map(
|
||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||
Encoders.bean(entityClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath);
|
||||
}
|
||||
|
||||
private static boolean pidInParam(String value, String[] verifyParam) {
|
||||
for (String s : verifyParam)
|
||||
if (value.startsWith(s))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean toTakeCountry(Country c, String country) {
|
||||
// If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be
|
||||
// inserted via propagation
|
||||
if (!Optional.ofNullable(c.getDataInfo()).isPresent())
|
||||
return true;
|
||||
if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent())
|
||||
return true;
|
||||
return !(c
|
||||
.getClassid()
|
||||
.equalsIgnoreCase(country) &&
|
||||
c.getDataInfo().getInferenceprovenance().equals("propagation"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean.country;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 22/07/22
|
||||
*/
|
||||
public class GetDatasourceFromCountry implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
GetDatasourceFromCountry.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json"));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
String country = parser.get("country");
|
||||
log.info("country: {}", country);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
|
||||
getDatasourceFromCountry(spark, country, inputPath, workingPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath,
|
||||
String workingPath) {
|
||||
|
||||
Dataset<Organization> organization = spark
|
||||
.read()
|
||||
.textFile(inputPath + "/organization")
|
||||
.map(
|
||||
(MapFunction<String, Organization>) value -> OBJECT_MAPPER.readValue(value, Organization.class),
|
||||
Encoders.bean(Organization.class))
|
||||
.filter(
|
||||
(FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference() &&
|
||||
o.getCountry().getClassid().length() > 0 &&
|
||||
o.getCountry().getClassid().equals(country));
|
||||
;
|
||||
|
||||
// filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass
|
||||
Dataset<Relation> relation = spark
|
||||
.read()
|
||||
.textFile(inputPath + "/relation")
|
||||
.map(
|
||||
(MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class),
|
||||
Encoders.bean(Relation.class))
|
||||
.filter(
|
||||
(FilterFunction<Relation>) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) &&
|
||||
!rel.getDataInfo().getDeletedbyinference());
|
||||
|
||||
organization
|
||||
.joinWith(relation, organization.col("id").equalTo(relation.col("target")), "left")
|
||||
.map((MapFunction<Tuple2<Organization, Relation>, String>) t2 -> t2._2().getSource(), Encoders.STRING())
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath);
|
||||
|
||||
}
|
||||
}
|
|
@ -14,8 +14,8 @@
|
|||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>shouldCleanContext</name>
|
||||
<description>true if the context have to be cleaned</description>
|
||||
<name>shouldClean</name>
|
||||
<description>true if the operation of deletion of not needed values from the results have to be performed</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>contextId</name>
|
||||
|
@ -30,6 +30,22 @@
|
|||
<description>It is the constrint to be verified. This time is hardcoded as gcube and it is searched for in
|
||||
the title. If title starts with gcube than the context sobigdata will be removed by the result if present</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>verifyCountryParam</name>
|
||||
<value>10.17632;10.5061</value>
|
||||
<description>It is the constraints to be verified. This time is hardcoded as the starting doi from mendeley and dryad and it is searched for in
|
||||
the pid value. If the pid value starts with one of the two prefixes, then the country may be removed</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>country</name>
|
||||
<value>NL</value>
|
||||
<description>It is the country to be removed from the set of countries if it is present with provenance propagation. The country will not be removed if in one of the isntances there is a datasource with country `country`</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>collectedfrom</name>
|
||||
<value>NARCIS</value>
|
||||
<description>the only datasource for which the country NL will be removed from the country list</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
|
@ -296,7 +312,7 @@
|
|||
|
||||
<decision name="clean_context">
|
||||
<switch>
|
||||
<case to="fork_clean_context">${wf:conf('shouldCleanContext') eq true}</case>
|
||||
<case to="fork_clean_context">${wf:conf('shouldClean') eq true}</case>
|
||||
<default to="End"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
@ -416,7 +432,158 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_clean_context" to="End"/>
|
||||
<join name="wait_clean_context" to="getHostedby"/>
|
||||
|
||||
|
||||
<action name="getHostedby">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Clean publications context</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.GetDatasourceFromCountry</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/working/hostedby</arg>
|
||||
<arg>--country</arg><arg>${country}</arg>
|
||||
</spark>
|
||||
<ok to="fork_clean_country"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<fork name="fork_clean_country">
|
||||
<path start="clean_publication_country"/>
|
||||
<path start="clean_dataset_country"/>
|
||||
<path start="clean_otherresearchproduct_country"/>
|
||||
<path start="clean_software_country"/>
|
||||
</fork>
|
||||
<action name="clean_publication_country">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Clean publications counmtry</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/working/publication</arg>
|
||||
<arg>--country</arg><arg>${country}</arg>
|
||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
|
||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean_context"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="clean_dataset_country">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Clean datasets Country</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/working/dataset</arg>
|
||||
<arg>--country</arg><arg>${country}</arg>
|
||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
|
||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean_context"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="clean_otherresearchproduct_country">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Clean otherresearchproducts country</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/working/otherresearchproduct</arg>
|
||||
<arg>--country</arg><arg>${country}</arg>
|
||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
|
||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean_context"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="clean_software_country">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Clean softwares country</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}/working/software</arg>
|
||||
<arg>--country</arg><arg>${country}</arg>
|
||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
|
||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean_country"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_clean_country" to="End"/>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,49 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "in",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the path to the graph data dump to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "wp",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the path to store the output graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "c",
|
||||
"paramLongName": "country",
|
||||
"paramDescription": "the id of the context to be removed",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "class",
|
||||
"paramLongName": "graphTableClassName",
|
||||
"paramDescription": "class name moelling the graph table",
|
||||
"paramRequired": true
|
||||
},{
|
||||
"paramName": "vf",
|
||||
"paramLongName": "verifyParam",
|
||||
"paramDescription": "the parameter to be verified to remove the country",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "cf",
|
||||
"paramLongName": "collectedfrom",
|
||||
"paramDescription": "the collectedfrom value for which we should apply the cleaning",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "hb",
|
||||
"paramLongName": "hostedBy",
|
||||
"paramDescription": "the set of datasources having the specified country in the graph searched for in the hostedby of the results",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "in",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the path to the graph data dump to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "wp",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the path to store the output graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "c",
|
||||
"paramLongName": "country",
|
||||
"paramDescription": "the id of the context to be removed",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,150 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 20/07/22
|
||||
*/
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
|
||||
public class CleanCountryTest {
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(CleanCountryTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(CleanCountryTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResultClean() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(sourcePath)
|
||||
.map(
|
||||
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
|
||||
Encoders.bean(Publication.class))
|
||||
.write()
|
||||
.json(workingDir.toString() + "/publication");
|
||||
|
||||
CleanCountrySparkJob.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--inputPath", workingDir.toString() + "/publication",
|
||||
"-graphTableClassName", Publication.class.getCanonicalName(),
|
||||
"-workingPath", workingDir.toString() + "/working",
|
||||
"-country", "NL",
|
||||
"-verifyParam", "10.17632",
|
||||
"-collectedfrom", "NARCIS",
|
||||
"-hostedBy", getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
||||
.getPath()
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<Publication> tmp = sc
|
||||
.textFile(workingDir.toString() + "/publication")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||
|
||||
Assertions.assertEquals(8, tmp.count());
|
||||
|
||||
// original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getCountry()
|
||||
.size());
|
||||
|
||||
// original result with NL country and pid not starting with Mendely prefix
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getCountry()
|
||||
.size());
|
||||
|
||||
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not
|
||||
// inserted with propagation
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getCountry()
|
||||
.size());
|
||||
|
||||
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with
|
||||
// propagation
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getCountry()
|
||||
.size());
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because one or more lines are too long
|
@ -92,6 +92,17 @@ compute stats indi_funded_result_with_fundref;
|
|||
--
|
||||
-- compute stats indi_result_org_collab;
|
||||
--
|
||||
create table indi_result_org_collab stored as parquet as
|
||||
with tmp as (
|
||||
select distinct ro.organization organization, ro.id from result_organization ro
|
||||
join organization o on o.id=ro.organization where o.name is not null)
|
||||
select o1.organization org1, o2.organization org2, count(o1.id) as collaborations
|
||||
from tmp as o1
|
||||
join tmp as o2 on o1.id=o2.id and o1.organization!=o2.organization
|
||||
group by org1, org2;
|
||||
|
||||
compute stats indi_result_org_collab;
|
||||
|
||||
-- create table indi_result_org_country_collab stored as parquet as
|
||||
-- with tmp as
|
||||
-- (select o.id as id, o.country , ro.id as result,r.type from organization o
|
||||
|
@ -105,6 +116,17 @@ compute stats indi_funded_result_with_fundref;
|
|||
--
|
||||
-- compute stats indi_result_org_country_collab;
|
||||
--
|
||||
create table indi_result_org_country_collab stored as parquet as
|
||||
with tmp as
|
||||
(select distinct ro.organization organization, ro.id, o.country from result_organization ro
|
||||
join organization o on o.id=ro.organization where country <> 'UNKNOWN' and o.name is not null)
|
||||
select o1.organization org1,o2.country country2, count(o1.id) as collaborations
|
||||
from tmp as o1 join tmp as o2 on o1.id=o2.id
|
||||
where o1.id=o2.id and o1.country!=o2.country
|
||||
group by o1.organization, o1.id, o2.country;
|
||||
|
||||
compute stats indi_result_org_country_collab;
|
||||
|
||||
-- create table indi_result_org_collab stored as parquet as
|
||||
-- with tmp as
|
||||
-- (select o.id, ro.id as result,r.type from organization o
|
||||
|
@ -166,6 +188,19 @@ compute stats indi_funder_country_collab;
|
|||
--
|
||||
-- compute stats indi_result_country_collab;
|
||||
|
||||
create table indi_result_country_collab stored as parquet as
|
||||
with tmp as
|
||||
(select distinct country, ro.id as result from organization o
|
||||
join result_organization ro on o.id=ro.organization
|
||||
where country <> 'UNKNOWN' and o.name is not null)
|
||||
select o1.country country1, o2.country country2, count(o1.result) as collaborations
|
||||
from tmp as o1
|
||||
join tmp as o2 on o1.result=o2.result
|
||||
where o1.country<>o2.country
|
||||
group by o1.country, o2.country;
|
||||
|
||||
compute stats indi_result_country_collab;
|
||||
|
||||
---- Sprint 4 ----
|
||||
create table indi_pub_diamond stored as parquet as
|
||||
select distinct pd.id, coalesce(in_diamond_journal, 0) as in_diamond_journal
|
||||
|
|
|
@ -23,7 +23,7 @@ create table TARGET.result stored as parquet as
|
|||
'openorgs____::d2a09b9d5eabb10c95f9470e172d05d2', --??? Not exists ??
|
||||
'openorgs____::d169c7407dd417152596908d48c11460', --Masaryk University
|
||||
'openorgs____::1ec924b1759bb16d0a02f2dad8689b21', --University of Belgrade
|
||||
'openorgs____::2fb1e47b4612688d9de9169d579939a7', --University of Helsinki
|
||||
'openorgs____::0ae431b820e4c33db8967fbb2b919150', --University of Helsinki
|
||||
'openorgs____::759d59f05d77188faee99b7493b46805', --University of Minho
|
||||
'openorgs____::cad284878801b9465fa51a95b1d779db', --Universidad Politécnica de Madrid
|
||||
'openorgs____::eadc8da90a546e98c03f896661a2e4d4', --University of Göttingen
|
||||
|
@ -48,7 +48,8 @@ create table TARGET.result stored as parquet as
|
|||
'openorgs____::4ac562f0376fce3539504567649cb373', -- University of Patras
|
||||
'openorgs____::3e8d1f8c3f6cd7f418b09f1f58b4873b', -- Aristotle University of Thessaloniki
|
||||
'openorgs____::3fcef6e1c469c10f2a84b281372c9814', -- World Bank
|
||||
'openorgs____::1698a2eb1885ef8adb5a4a969e745ad3' -- École des Ponts ParisTech
|
||||
'openorgs____::1698a2eb1885ef8adb5a4a969e745ad3', -- École des Ponts ParisTech
|
||||
'openorgs____::e15adb13c4dadd49de4d35c39b5da93a' -- Nanyang Technological University
|
||||
) )) foo;
|
||||
compute stats TARGET.result;
|
||||
|
||||
|
@ -154,50 +155,44 @@ create table TARGET.project_results stored as parquet as select id as result, pr
|
|||
compute stats TARGET.project_results;
|
||||
|
||||
-- indicators
|
||||
-- Sprint 1 ----
|
||||
create table TARGET.indi_pub_green_oa stored as parquet as select * from SOURCE.indi_pub_green_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_green_oa;
|
||||
create table TARGET.indi_pub_grey_lit stored as parquet as select * from SOURCE.indi_pub_grey_lit orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_grey_lit;
|
||||
create table TARGET.indi_pub_doi_from_crossref stored as parquet as select * from SOURCE.indi_pub_doi_from_crossref orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_doi_from_crossref;
|
||||
create table TARGET.indi_pub_gold_oa stored as parquet as select * from SOURCE.indi_pub_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_gold_oa;
|
||||
--create table TARGET.indi_datasets_gold_oa stored as parquet as select * from SOURCE.indi_datasets_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
--compute stats TARGET.indi_datasets_gold_oa;
|
||||
--create table TARGET.indi_software_gold_oa stored as parquet as select * from SOURCE.indi_software_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
--compute stats TARGET.indi_software_gold_oa;
|
||||
create table TARGET.indi_pub_has_abstract stored as parquet as select * from SOURCE.indi_pub_has_abstract orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_has_abstract;
|
||||
-- Sprint 2 ----
|
||||
create table TARGET.indi_result_has_cc_licence stored as parquet as select * from SOURCE.indi_result_has_cc_licence orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_result_has_cc_licence;
|
||||
create table TARGET.indi_result_has_cc_licence_url stored as parquet as select * from SOURCE.indi_result_has_cc_licence_url orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_result_has_cc_licence_url;
|
||||
|
||||
create view TARGET.indi_funder_country_collab as select * from SOURCE.indi_funder_country_collab;
|
||||
create view TARGET.indi_project_collab_org as select * from SOURCE.indi_project_collab_org;
|
||||
create view TARGET.indi_project_collab_org_country as select * from SOURCE.indi_project_collab_org_country;
|
||||
|
||||
create table TARGET.indi_pub_has_abstract stored as parquet as select * from SOURCE.indi_pub_has_abstract orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_has_abstract;
|
||||
create table TARGET.indi_result_with_orcid stored as parquet as select * from SOURCE.indi_result_with_orcid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_result_with_orcid;
|
||||
---- Sprint 3 ----
|
||||
create table TARGET.indi_funded_result_with_fundref stored as parquet as select * from SOURCE.indi_funded_result_with_fundref orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_funded_result_with_fundref;
|
||||
create view TARGET.indi_result_org_collab as select * from SOURCE.indi_result_org_collab;
|
||||
create view TARGET.indi_result_org_country_collab as select * from SOURCE.indi_result_org_country_collab;
|
||||
create view TARGET.indi_project_collab_org as select * from SOURCE.indi_project_collab_org;
|
||||
create view TARGET.indi_project_collab_org_country as select * from SOURCE.indi_project_collab_org_country;
|
||||
create view TARGET.indi_funder_country_collab as select * from SOURCE.indi_funder_country_collab;
|
||||
create view TARGET.indi_result_country_collab as select * from SOURCE.indi_result_country_collab;
|
||||
---- Sprint 4 ----
|
||||
create table TARGET.indi_pub_diamond stored as parquet as select * from SOURCE.indi_pub_diamond orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_diamond;
|
||||
create table TARGET.indi_pub_hybrid stored as parquet as select * from SOURCE.indi_pub_hybrid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_hybrid;
|
||||
create table TARGET.indi_pub_in_transformative stored as parquet as select * from SOURCE.indi_pub_in_transformative orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_in_transformative;
|
||||
create table TARGET.indi_pub_closed_other_open stored as parquet as select * from SOURCE.indi_pub_closed_other_open orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_closed_other_open;
|
||||
|
||||
---- Sprint 5 ----
|
||||
create table TARGET.indi_result_no_of_copies stored as parquet as select * from SOURCE.indi_result_no_of_copies orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_result_no_of_copies;
|
||||
|
||||
create view TARGET.indi_org_findable as select * from SOURCE.indi_org_findable;
|
||||
create view TARGET.indi_org_openess as select * from SOURCE.indi_org_openess;
|
||||
---- Sprint 6 ----
|
||||
create table TARGET.indi_pub_hybrid_oa_with_cc stored as parquet as select * from SOURCE.indi_pub_hybrid_oa_with_cc orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_hybrid_oa_with_cc;
|
||||
|
||||
create table TARGET.indi_pub_downloads stored as parquet as select * from SOURCE.indi_pub_downloads orig where exists (select 1 from TARGET.result r where r.id=orig.result_id);
|
||||
compute stats TARGET.indi_pub_downloads;
|
||||
create table TARGET.indi_pub_downloads_datasource stored as parquet as select * from SOURCE.indi_pub_downloads_datasource orig where exists (select 1 from TARGET.result r where r.id=orig.result_id);
|
||||
|
@ -206,6 +201,28 @@ create table TARGET.indi_pub_downloads_year stored as parquet as select * from S
|
|||
compute stats TARGET.indi_pub_downloads_year;
|
||||
create table TARGET.indi_pub_downloads_datasource_year stored as parquet as select * from SOURCE.indi_pub_downloads_datasource_year orig where exists (select 1 from TARGET.result r where r.id=orig.result_id);
|
||||
compute stats TARGET.indi_pub_downloads_datasource_year;
|
||||
---- Sprint 7 ----
|
||||
create table TARGET.indi_pub_gold_oa stored as parquet as select * from SOURCE.indi_pub_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_gold_oa;
|
||||
create table TARGET.indi_pub_hybrid stored as parquet as select * from SOURCE.indi_pub_hybrid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
compute stats TARGET.indi_pub_hybrid;
|
||||
create view TARGET.indi_org_fairness as select * from SOURCE.indi_org_fairness;
|
||||
create view TARGET.indi_org_fairness_pub_pr as select * from SOURCE.indi_org_fairness_pub_pr;
|
||||
create view TARGET.indi_org_fairness_pub_year as select * from SOURCE.indi_org_fairness_pub_year;
|
||||
create view TARGET.indi_org_fairness_pub as select * from SOURCE.indi_org_fairness_pub;
|
||||
create view TARGET.indi_org_fairness_year as select * from SOURCE.indi_org_fairness_year;
|
||||
create view TARGET.indi_org_findable_year as select * from SOURCE.indi_org_findable_year;
|
||||
create view TARGET.indi_org_findable as select * from SOURCE.indi_org_findable;
|
||||
create view TARGET.indi_org_openess as select * from SOURCE.indi_org_openess;
|
||||
create view TARGET.indi_org_openess_year as select * from SOURCE.indi_org_openess_year;
|
||||
create table TARGET.indi_pub_has_preprint stored as parquet as select * from SOURCE.indi_pub_has_preprint orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
create table TARGET.indi_pub_in_subscribed stored as parquet as select * from SOURCE.indi_pub_in_subscribed orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
create table TARGET.indi_result_with_pid stored as parquet as select * from SOURCE.indi_result_with_pid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
|
||||
--create table TARGET.indi_datasets_gold_oa stored as parquet as select * from SOURCE.indi_datasets_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
--compute stats TARGET.indi_datasets_gold_oa;
|
||||
--create table TARGET.indi_software_gold_oa stored as parquet as select * from SOURCE.indi_software_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||
--compute stats TARGET.indi_software_gold_oa;
|
||||
|
||||
--denorm
|
||||
alter table TARGET.result rename to TARGET.res_tmp;
|
||||
|
|
Loading…
Reference in New Issue