master #59

Closed
claudio.atzori wants to merge 3221 commits from master into stable_ids
23 changed files with 967 additions and 94 deletions
Showing only changes of commit 0b3e44e521 - Show all commits

View File

@ -5,6 +5,8 @@ import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.avro.generic.GenericData;
import com.google.gson.Gson; import com.google.gson.Gson;
/** Created by miriam on 01/08/2018. */ /** Created by miriam on 01/08/2018. */
@ -14,6 +16,7 @@ public class Community implements Serializable {
private List<String> subjects = new ArrayList<>(); private List<String> subjects = new ArrayList<>();
private List<Provider> providers = new ArrayList<>(); private List<Provider> providers = new ArrayList<>();
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>(); private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
private SelectionConstraints constraints = new SelectionConstraints();
public String toJson() { public String toJson() {
final Gson g = new Gson(); final Gson g = new Gson();
@ -57,4 +60,12 @@ public class Community implements Serializable {
public void setZenodoCommunities(List<ZenodoCommunity> zenodoCommunities) { public void setZenodoCommunities(List<ZenodoCommunity> zenodoCommunities) {
this.zenodoCommunities = zenodoCommunities; this.zenodoCommunities = zenodoCommunities;
} }
public SelectionConstraints getConstraints() {
return constraints;
}
public void setConstraints(SelectionConstraints constraints) {
this.constraints = constraints;
}
} }

View File

@ -24,6 +24,8 @@ public class CommunityConfiguration implements Serializable {
private Map<String, List<Pair<String, SelectionConstraints>>> datasourceMap = new HashMap<>(); private Map<String, List<Pair<String, SelectionConstraints>>> datasourceMap = new HashMap<>();
// map zenodocommunityid -> communityid // map zenodocommunityid -> communityid
private Map<String, List<Pair<String, SelectionConstraints>>> zenodocommunityMap = new HashMap<>(); private Map<String, List<Pair<String, SelectionConstraints>>> zenodocommunityMap = new HashMap<>();
// map communityid -> selectionconstraints
private Map<String, SelectionConstraints> selectionConstraintsMap = new HashMap<>();
public Map<String, List<Pair<String, SelectionConstraints>>> getSubjectMap() { public Map<String, List<Pair<String, SelectionConstraints>>> getSubjectMap() {
return subjectMap; return subjectMap;
@ -51,6 +53,14 @@ public class CommunityConfiguration implements Serializable {
this.zenodocommunityMap = zenodocommunityMap; this.zenodocommunityMap = zenodocommunityMap;
} }
public Map<String, SelectionConstraints> getSelectionConstraintsMap() {
return selectionConstraintsMap;
}
public void setSelectionConstraintsMap(Map<String, SelectionConstraints> selectionConstraintsMap) {
this.selectionConstraintsMap = selectionConstraintsMap;
}
CommunityConfiguration(final Map<String, Community> communities) { CommunityConfiguration(final Map<String, Community> communities) {
this.communities = communities; this.communities = communities;
init(); init();
@ -67,6 +77,9 @@ public class CommunityConfiguration implements Serializable {
if (zenodocommunityMap == null) { if (zenodocommunityMap == null) {
zenodocommunityMap = Maps.newHashMap(); zenodocommunityMap = Maps.newHashMap();
} }
if (selectionConstraintsMap == null) {
selectionConstraintsMap = Maps.newHashMap();
}
for (Community c : getCommunities().values()) { for (Community c : getCommunities().values()) {
// get subjects // get subjects
@ -87,6 +100,7 @@ public class CommunityConfiguration implements Serializable {
new Pair<>(id, zc.getSelCriteria()), new Pair<>(id, zc.getSelCriteria()),
zenodocommunityMap); zenodocommunityMap);
} }
selectionConstraintsMap.put(id, c.getConstraints());
} }
} }

View File

@ -85,9 +85,22 @@ public class CommunityConfigurationFactory {
c.setSubjects(parseSubjects(node)); c.setSubjects(parseSubjects(node));
c.setProviders(parseDatasources(node)); c.setProviders(parseDatasources(node));
c.setZenodoCommunities(parseZenodoCommunities(node)); c.setZenodoCommunities(parseZenodoCommunities(node));
c.setConstraints(parseConstrains(node));
return c; return c;
} }
private static SelectionConstraints parseConstrains(Node node) {
Node aconstraints = node.selectSingleNode("./advancedConstraints");
if (aconstraints == null) {
return null;
}
SelectionConstraints selectionConstraints = new Gson()
.fromJson(aconstraints.getText(), SelectionConstraints.class);
selectionConstraints.setSelection(resolver);
return selectionConstraints;
}
private static List<String> parseSubjects(final Node node) { private static List<String> parseSubjects(final Node node) {
final List<String> subjects = Lists.newArrayList(); final List<String> subjects = Lists.newArrayList();

View File

@ -11,6 +11,7 @@ public class Constraint implements Serializable {
private String verb; private String verb;
private String field; private String field;
private String value; private String value;
// private String element;
private Selection selection; private Selection selection;
public String getVerb() { public String getVerb() {
@ -50,4 +51,12 @@ public class Constraint implements Serializable {
public boolean verifyCriteria(String metadata) { public boolean verifyCriteria(String metadata) {
return selection.apply(metadata); return selection.apply(metadata);
} }
// public String getElement() {
// return element;
// }
//
// public void setElement(String element) {
// this.element = element;
// }
} }

View File

@ -18,6 +18,8 @@ public class QueryInformationSystem {
+ " let $datasources := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::contentproviders')]/concept " + " let $datasources := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::contentproviders')]/concept "
+ " let $organizations := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::resultorganizations')]/concept " + " let $organizations := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::resultorganizations')]/concept "
+ " let $communities := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::zenodocommunities')]/concept " + " let $communities := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::zenodocommunities')]/concept "
+ " let $fos := $x//CONFIGURATION/context/param[./@name='fos']/text() "
+ " let $sdg := $x//CONFIGURATION/context/param[./@name='sdg']/text() "
+ +
"let $zenodo := $x//param[./@name='zenodoCommunity']/text() " "let $zenodo := $x//param[./@name='zenodoCommunity']/text() "
+ " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] and $x//context/param[./@name = 'status']/text() != 'hidden' " + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] and $x//context/param[./@name = 'status']/text() != 'hidden' "
@ -28,6 +30,12 @@ public class QueryInformationSystem {
+ " {for $y in tokenize($subj,',') " + " {for $y in tokenize($subj,',') "
+ " return " + " return "
+ " <subject>{$y}</subject>} " + " <subject>{$y}</subject>} "
+ " {for $y in tokenize($fos,',') "
+ " return "
+ " <subject>{$y}</subject>} "
+ " {for $y in tokenize($sdg,',') "
+ " return "
+ " <subject>{$y}</subject>} "
+ " </subjects> " + " </subjects> "
+ " <datasources> " + " <datasources> "
+ " {for $d in $datasources " + " {for $d in $datasources "
@ -61,6 +69,9 @@ public class QueryInformationSystem {
+ " </selcriteria> " + " </selcriteria> "
+ " </zenodocommunity>} " + " </zenodocommunity>} "
+ " </zenodocommunities> " + " </zenodocommunities> "
+ "<advancedConstraint>"
+ "{$x//CONFIGURATION/context/param[./@name='advancedConstraint']/text()} "
+ "</advancedConstraint>"
+ " </community>"; + " </community>";
public static CommunityConfiguration getCommunityConfiguration(final String isLookupUrl) public static CommunityConfiguration getCommunityConfiguration(final String isLookupUrl)

View File

@ -15,7 +15,10 @@ import com.google.gson.Gson;
import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
/** Created by miriam on 02/08/2018. */ /** Created by miriam on 02/08/2018. */
public class ResultTagger implements Serializable { public class ResultTagger implements Serializable {
@ -95,13 +98,6 @@ public class ResultTagger implements Serializable {
} }
// result
// .getInstance()
// .stream()
// .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
// .flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
// .map(s -> StringUtils.substringAfter(s, "|"))
// .collect(Collectors.toCollection(HashSet::new))
tmp tmp
.forEach( .forEach(
dsId -> datasources dsId -> datasources
@ -135,6 +131,25 @@ public class ResultTagger implements Serializable {
communities.addAll(czenodo); communities.addAll(czenodo);
/* Tagging for Advanced Constraints */
final Set<String> aconstraints = new HashSet<>();
conf
.getSelectionConstraintsMap()
.keySet()
.forEach(communityId -> {
if (conf.getSelectionConstraintsMap().get(communityId) != null &&
conf
.getSelectionConstraintsMap()
.get(communityId)
.getCriteria()
.stream()
.anyMatch(crit -> crit.verifyCriteria(param)))
aconstraints.add(communityId);
});
communities.addAll(aconstraints);
clearContext(result); clearContext(result);
/* Verify if there is something to bulktag */ /* Verify if there is something to bulktag */
@ -152,30 +167,51 @@ public class ResultTagger implements Serializable {
dataInfoList = new ArrayList<>(); dataInfoList = new ArrayList<>();
c.setDataInfo(dataInfoList); c.setDataInfo(dataInfoList);
} }
if (subjects.contains(c.getId())) if (subjects.contains(c))
dataInfoList dataInfoList
.add( .add(
getDataInfo( OafMapperUtils
BULKTAG_DATA_INFO_TYPE, .dataInfo(
CLASS_ID_SUBJECT, false, BULKTAG_DATA_INFO_TYPE, true, false,
CLASS_NAME_BULKTAG_SUBJECT, OafMapperUtils
TAGGING_TRUST)); .qualifier(
if (datasources.contains(c.getId())) CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (datasources.contains(c))
dataInfoList dataInfoList
.add( .add(
getDataInfo( OafMapperUtils
BULKTAG_DATA_INFO_TYPE, .dataInfo(
CLASS_ID_DATASOURCE, false, BULKTAG_DATA_INFO_TYPE, true, false,
CLASS_NAME_BULKTAG_DATASOURCE, OafMapperUtils
TAGGING_TRUST)); .qualifier(
if (czenodo.contains(c.getId())) CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (czenodo.contains(c))
dataInfoList dataInfoList
.add( .add(
getDataInfo( OafMapperUtils
BULKTAG_DATA_INFO_TYPE, .dataInfo(
CLASS_ID_CZENODO, false, BULKTAG_DATA_INFO_TYPE, true, false,
CLASS_NAME_BULKTAG_ZENODO, OafMapperUtils
TAGGING_TRUST)); .qualifier(
CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (aconstraints.contains(c))
dataInfoList
.add(
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_ADVANCED_CONSTRAINT, CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
} }
}); });
@ -196,27 +232,48 @@ public class ResultTagger implements Serializable {
if (subjects.contains(c)) if (subjects.contains(c))
dataInfoList dataInfoList
.add( .add(
getDataInfo( OafMapperUtils
BULKTAG_DATA_INFO_TYPE, .dataInfo(
CLASS_ID_SUBJECT, false, BULKTAG_DATA_INFO_TYPE, true, false,
CLASS_NAME_BULKTAG_SUBJECT, OafMapperUtils
TAGGING_TRUST)); .qualifier(
CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (datasources.contains(c)) if (datasources.contains(c))
dataInfoList dataInfoList
.add( .add(
getDataInfo( OafMapperUtils
BULKTAG_DATA_INFO_TYPE, .dataInfo(
CLASS_ID_DATASOURCE, false, BULKTAG_DATA_INFO_TYPE, true, false,
CLASS_NAME_BULKTAG_DATASOURCE, OafMapperUtils
TAGGING_TRUST)); .qualifier(
CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (czenodo.contains(c)) if (czenodo.contains(c))
dataInfoList dataInfoList
.add( .add(
getDataInfo( OafMapperUtils
BULKTAG_DATA_INFO_TYPE, .dataInfo(
CLASS_ID_CZENODO, false, BULKTAG_DATA_INFO_TYPE, true, false,
CLASS_NAME_BULKTAG_ZENODO, OafMapperUtils
TAGGING_TRUST)); .qualifier(
CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO, DNET_PROVENANCE_ACTIONS,
DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
if (aconstraints.contains(c))
dataInfoList
.add(
OafMapperUtils
.dataInfo(
false, BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
CLASS_ID_ADVANCED_CONSTRAINT, CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS),
TAGGING_TRUST));
context.setDataInfo(dataInfoList); context.setDataInfo(dataInfoList);
return context; return context;
}) })
@ -226,22 +283,4 @@ public class ResultTagger implements Serializable {
return result; return result;
} }
public static DataInfo getDataInfo(
String inference_provenance, String inference_class_id, String inference_class_name, String trust) {
DataInfo di = new DataInfo();
di.setInferred(true);
di.setInferenceprovenance(inference_provenance);
di.setProvenanceaction(getQualifier(inference_class_id, inference_class_name));
di.setTrust(trust);
return di;
}
public static Qualifier getQualifier(String inference_class_id, String inference_class_name) {
Qualifier pa = new Qualifier();
pa.setClassid(inference_class_id);
pa.setClassname(inference_class_name);
pa.setSchemeid(DNET_PROVENANCE_ACTIONS);
pa.setSchemename(DNET_PROVENANCE_ACTIONS);
return pa;
}
} }

View File

@ -11,12 +11,14 @@ public class TaggingConstants {
public static final String CLASS_ID_SUBJECT = "community:subject"; public static final String CLASS_ID_SUBJECT = "community:subject";
public static final String CLASS_ID_DATASOURCE = "community:datasource"; public static final String CLASS_ID_DATASOURCE = "community:datasource";
public static final String CLASS_ID_CZENODO = "community:zenodocommunity"; public static final String CLASS_ID_CZENODO = "community:zenodocommunity";
public static final String CLASS_ID_ADVANCED_CONSTRAINT = "community:advconstraint";
public static final String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/"; public static final String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/";
public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject"; public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject";
public static final String CLASS_NAME_BULKTAG_DATASOURCE = "Bulktagging for Community - Datasource"; public static final String CLASS_NAME_BULKTAG_DATASOURCE = "Bulktagging for Community - Datasource";
public static final String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo"; public static final String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo";
public static final String CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT = "Bulktagging for Community - Advanced Constraints";
public static final String TAGGING_TRUST = "0.8"; public static final String TAGGING_TRUST = "0.8";
} }

View File

@ -121,8 +121,7 @@ public class SparkEoscBulkTag implements Serializable {
.getInstance() .getInstance()
.stream() .stream()
.anyMatch( .anyMatch(
i -> (hostedByList.contains(i.getHostedby().getKey())) || i -> (hostedByList.contains(i.getHostedby().getKey())))
(value.getEoscifguidelines() != null && value.getEoscifguidelines().size() > 0))
&& &&
!value.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) { !value.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) {
Context context = new Context(); Context context = new Context();

View File

@ -6,6 +6,10 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -23,11 +27,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.bulktag.community.ProtoMap;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
public class BulkTagJobTest { public class BulkTagJobTest {
@ -39,7 +44,8 @@ public class BulkTagJobTest {
+ " \"title\" : \"$['title'][*]['value']\"," + " \"title\" : \"$['title'][*]['value']\","
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\"," + " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
+ " \"contributor\" : \"$['contributor'][*]['value']\"," + " \"contributor\" : \"$['contributor'][*]['value']\","
+ " \"description\" : \"$['description'][*]['value']\"}"; + " \"description\" : \"$['description'][*]['value']\", "
+ " \"subject\" :\"$['subject'][*]['value']\" }";
private static SparkSession spark; private static SparkSession spark;
@ -763,10 +769,28 @@ public class BulkTagJobTest {
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query); org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
idExplodeCommunity.show(false); idExplodeCommunity.show(false);
Assertions.assertEquals(3, idExplodeCommunity.count()); Assertions.assertEquals(4, idExplodeCommunity.count());
Assertions Assertions
.assertEquals( .assertEquals(
3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
Assertions
.assertEquals(
1, idExplodeCommunity.filter("provenance = 'community:advconstraint'").count());
} }
// @Test
// void test1(){
// ProtoMap params = new Gson().fromJson(pathMap, ProtoMap.class);
// HashMap<String, String> param = new HashMap<>();
// for (String key : params.keySet()) {
// try {
// param.put(key, jsonContext.read(params.get(key)));
// } catch (com.jayway.jsonpath.PathNotFoundException e) {
// param.put(key, new ArrayList<>());
// }
// }
// return param;
// }
// }
} }

View File

@ -1193,6 +1193,9 @@
<organizations/> <organizations/>
</community> </community>
<community id="science-innovation-policy"> <community id="science-innovation-policy">
<advancedConstraints>{"criteria":[{"constraint":[{"verb":"equals_ignorecase","field":"subject","value":"ciencias de la comunicación"},
{"verb":"equals","field":"subject","value":"Miriam"}]},
{"constraint":[{"verb":"equals","field":"subject","value":"miriam"}]}]}</advancedConstraints>
<subjects> <subjects>
<subject>Sustainability-oriented science policy</subject> <subject>Sustainability-oriented science policy</subject>
<subject> STI policies</subject> <subject> STI policies</subject>
@ -1316,7 +1319,7 @@
<openaireId>opendoar____::358aee4cc897452c00244351e4d91f69</openaireId> <openaireId>opendoar____::358aee4cc897452c00244351e4d91f69</openaireId>
<selcriteria>{"criteria":[{"constraint":[{"verb":"contains_ignorecase","field":"title","value":"COVID-19"}]}, <selcriteria>{"criteria":[{"constraint":[{"verb":"contains_ignorecase","field":"title","value":"COVID-19"}]},
{"constraint":[{"verb":"contains_ignorecase","field":"title","value":"SARS-CoV-2"}]}, {"constraint":[{"verb":"contains_ignorecase","field":"title","value":"SARS-CoV-2"}]},
{"constraint":[{"verb":"contains_ignorecase","field":"title","value":"2019-nCoV"}]}]} {"constraint":[{"verb":"contains_ignorecase","field":"title","value":"2019-nCoV"}}]}
</selcriteria> </selcriteria>
</datasource> </datasource>
<datasource> <datasource>

View File

@ -0,0 +1,171 @@
package eu.dnetlib.dhp.oa.graph.clean.country;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.swing.text.html.Option;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author miriam.baglioni
* @Date 20/07/22
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
public class CleanCountrySparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CleanContextSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
String datasourcePath = parser.get("hostedBy");
log.info("datasourcePath: {}", datasourcePath);
String country = parser.get("country");
log.info("country: {}", country);
String[] verifyParam = parser.get("verifyParam").split(";");
log.info("verifyParam: {}", verifyParam);
String collectedfrom = parser.get("collectedfrom");
log.info("collectedfrom: {}", collectedfrom);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
cleanCountry(
spark, country, verifyParam, inputPath, entityClazz, workingPath, collectedfrom, datasourcePath);
});
}
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String[] verifyParam,
String inputPath, Class<T> entityClazz, String workingPath, String collectedfrom, String datasourcePath) {
List<String> hostedBy = spark
.read()
.textFile(datasourcePath)
.collectAsList();
Dataset<T> res = spark
.read()
.textFile(inputPath)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz));
res.map((MapFunction<T, T>) r -> {
if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) ||
!r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) {
return r;
}
if (r
.getPid()
.stream()
.anyMatch(
p -> p
.getQualifier()
.getClassid()
.equals(PidType.doi) && pidInParam(p.getValue(), verifyParam))) {
r
.setCountry(
r
.getCountry()
.stream()
.filter(
c -> toTakeCountry(c, country))
.collect(Collectors.toList()));
}
return r;
}, Encoders.bean(entityClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
spark
.read()
.textFile(workingPath)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath);
}
private static boolean pidInParam(String value, String[] verifyParam) {
for (String s : verifyParam)
if (value.startsWith(s))
return true;
return false;
}
private static boolean toTakeCountry(Country c, String country) {
// If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be
// inserted via propagation
if (!Optional.ofNullable(c.getDataInfo()).isPresent())
return true;
if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent())
return true;
return !(c
.getClassid()
.equalsIgnoreCase(country) &&
c.getDataInfo().getInferenceprovenance().equals("propagation"));
}
}

View File

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.oa.graph.clean.country;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 22/07/22
*/
public class GetDatasourceFromCountry implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GetDatasourceFromCountry.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_datasource_country_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
String country = parser.get("country");
log.info("country: {}", country);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
getDatasourceFromCountry(spark, country, inputPath, workingPath);
});
}
private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath,
String workingPath) {
Dataset<Organization> organization = spark
.read()
.textFile(inputPath + "/organization")
.map(
(MapFunction<String, Organization>) value -> OBJECT_MAPPER.readValue(value, Organization.class),
Encoders.bean(Organization.class))
.filter(
(FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference() &&
o.getCountry().getClassid().length() > 0 &&
o.getCountry().getClassid().equals(country));
;
// filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass
Dataset<Relation> relation = spark
.read()
.textFile(inputPath + "/relation")
.map(
(MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class),
Encoders.bean(Relation.class))
.filter(
(FilterFunction<Relation>) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) &&
!rel.getDataInfo().getDeletedbyinference());
organization
.joinWith(relation, organization.col("id").equalTo(relation.col("target")), "left")
.map((MapFunction<Tuple2<Organization, Relation>, String>) t2 -> t2._2().getSource(), Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
}
}

View File

@ -14,8 +14,8 @@
<description>the address of the lookUp service</description> <description>the address of the lookUp service</description>
</property> </property>
<property> <property>
<name>shouldCleanContext</name> <name>shouldClean</name>
<description>true if the context have to be cleaned</description> <description>true if the operation of deletion of not needed values from the results have to be performed</description>
</property> </property>
<property> <property>
<name>contextId</name> <name>contextId</name>
@ -30,6 +30,22 @@
<description>It is the constrint to be verified. This time is hardcoded as gcube and it is searched for in <description>It is the constrint to be verified. This time is hardcoded as gcube and it is searched for in
the title. If title starts with gcube than the context sobigdata will be removed by the result if present</description> the title. If title starts with gcube than the context sobigdata will be removed by the result if present</description>
</property> </property>
<property>
<name>verifyCountryParam</name>
<value>10.17632;10.5061</value>
<description>It is the constraints to be verified. This time is hardcoded as the starting doi from mendeley and dryad and it is searched for in
the pid value. If the pid value starts with one of the two prefixes, then the country may be removed</description>
</property>
<property>
<name>country</name>
<value>NL</value>
<description>It is the country to be removed from the set of countries if it is present with provenance propagation. The country will not be removed if in one of the isntances there is a datasource with country `country`</description>
</property>
<property>
<name>collectedfrom</name>
<value>NARCIS</value>
<description>the only datasource for which the country NL will be removed from the country list</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
@ -296,7 +312,7 @@
<decision name="clean_context"> <decision name="clean_context">
<switch> <switch>
<case to="fork_clean_context">${wf:conf('shouldCleanContext') eq true}</case> <case to="fork_clean_context">${wf:conf('shouldClean') eq true}</case>
<default to="End"/> <default to="End"/>
</switch> </switch>
</decision> </decision>
@ -416,7 +432,158 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_clean_context" to="End"/> <join name="wait_clean_context" to="getHostedby"/>
<action name="getHostedby">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean publications context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.GetDatasourceFromCountry</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--country</arg><arg>${country}</arg>
</spark>
<ok to="fork_clean_country"/>
<error to="Kill"/>
</action>
<fork name="fork_clean_country">
<path start="clean_publication_country"/>
<path start="clean_dataset_country"/>
<path start="clean_otherresearchproduct_country"/>
<path start="clean_software_country"/>
</fork>
<action name="clean_publication_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean publications counmtry</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/publication</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_dataset_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean datasets Country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/dataset</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_otherresearchproduct_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean otherresearchproducts country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/otherresearchproduct</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_software_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean softwares country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/software</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
<arg>--datasourcePath</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean_country"/>
<error to="Kill"/>
</action>
<join name="wait_clean_country" to="End"/>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -0,0 +1,49 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "country",
"paramDescription": "the id of the context to be removed",
"paramRequired": true
},
{
"paramName": "class",
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
},{
"paramName": "vf",
"paramLongName": "verifyParam",
"paramDescription": "the parameter to be verified to remove the country",
"paramRequired": true
},
{
"paramName": "cf",
"paramLongName": "collectedfrom",
"paramDescription": "the collectedfrom value for which we should apply the cleaning",
"paramRequired": true
},
{
"paramName": "hb",
"paramLongName": "hostedBy",
"paramDescription": "the set of datasources having the specified country in the graph searched for in the hostedby of the results",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "country",
"paramDescription": "the id of the context to be removed",
"paramRequired": true
}
]

View File

@ -0,0 +1,150 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author miriam.baglioni
* @Date 20/07/22
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
import eu.dnetlib.dhp.schema.oaf.Publication;
public class CleanCountryTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(CleanCountryTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(CleanCountryTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void testResultClean() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json")
.getPath();
spark
.read()
.textFile(sourcePath)
.map(
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
Encoders.bean(Publication.class))
.write()
.json(workingDir.toString() + "/publication");
CleanCountrySparkJob.main(new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--inputPath", workingDir.toString() + "/publication",
"-graphTableClassName", Publication.class.getCanonicalName(),
"-workingPath", workingDir.toString() + "/working",
"-country", "NL",
"-verifyParam", "10.17632",
"-collectedfrom", "NARCIS",
"-hostedBy", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
.getPath()
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.textFile(workingDir.toString() + "/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
Assertions.assertEquals(8, tmp.count());
// original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
.collect()
.get(0)
.getCountry()
.size());
// original result with NL country and pid not starting with Mendely prefix
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
.collect()
.get(0)
.getCountry()
.size());
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not
// inserted with propagation
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
.collect()
.get(0)
.getCountry()
.size());
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with
// propagation
Assertions
.assertEquals(
0,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag"))
.collect()
.get(0)
.getCountry()
.size());
}
}

View File

@ -6,3 +6,10 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# Change this to set Spark log level
log4j.logger.org.apache.spark=WARN
# Silence akka remoting
log4j.logger.Remoting=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN

View File

@ -92,6 +92,17 @@ compute stats indi_funded_result_with_fundref;
-- --
-- compute stats indi_result_org_collab; -- compute stats indi_result_org_collab;
-- --
create table indi_result_org_collab stored as parquet as
with tmp as (
select distinct ro.organization organization, ro.id from result_organization ro
join organization o on o.id=ro.organization where o.name is not null)
select o1.organization org1, o2.organization org2, count(o1.id) as collaborations
from tmp as o1
join tmp as o2 on o1.id=o2.id and o1.organization!=o2.organization
group by org1, org2;
compute stats indi_result_org_collab;
-- create table indi_result_org_country_collab stored as parquet as -- create table indi_result_org_country_collab stored as parquet as
-- with tmp as -- with tmp as
-- (select o.id as id, o.country , ro.id as result,r.type from organization o -- (select o.id as id, o.country , ro.id as result,r.type from organization o
@ -105,6 +116,17 @@ compute stats indi_funded_result_with_fundref;
-- --
-- compute stats indi_result_org_country_collab; -- compute stats indi_result_org_country_collab;
-- --
create table indi_result_org_country_collab stored as parquet as
with tmp as
(select distinct ro.organization organization, ro.id, o.country from result_organization ro
join organization o on o.id=ro.organization where country <> 'UNKNOWN' and o.name is not null)
select o1.organization org1,o2.country country2, count(o1.id) as collaborations
from tmp as o1 join tmp as o2 on o1.id=o2.id
where o1.id=o2.id and o1.country!=o2.country
group by o1.organization, o1.id, o2.country;
compute stats indi_result_org_country_collab;
-- create table indi_result_org_collab stored as parquet as -- create table indi_result_org_collab stored as parquet as
-- with tmp as -- with tmp as
-- (select o.id, ro.id as result,r.type from organization o -- (select o.id, ro.id as result,r.type from organization o
@ -166,6 +188,19 @@ compute stats indi_funder_country_collab;
-- --
-- compute stats indi_result_country_collab; -- compute stats indi_result_country_collab;
create table indi_result_country_collab stored as parquet as
with tmp as
(select distinct country, ro.id as result from organization o
join result_organization ro on o.id=ro.organization
where country <> 'UNKNOWN' and o.name is not null)
select o1.country country1, o2.country country2, count(o1.result) as collaborations
from tmp as o1
join tmp as o2 on o1.result=o2.result
where o1.country<>o2.country
group by o1.country, o2.country;
compute stats indi_result_country_collab;
---- Sprint 4 ---- ---- Sprint 4 ----
create table indi_pub_diamond stored as parquet as create table indi_pub_diamond stored as parquet as
select distinct pd.id, coalesce(in_diamond_journal, 0) as in_diamond_journal select distinct pd.id, coalesce(in_diamond_journal, 0) as in_diamond_journal

View File

@ -23,7 +23,7 @@ create table TARGET.result stored as parquet as
'openorgs____::d2a09b9d5eabb10c95f9470e172d05d2', --??? Not exists ?? 'openorgs____::d2a09b9d5eabb10c95f9470e172d05d2', --??? Not exists ??
'openorgs____::d169c7407dd417152596908d48c11460', --Masaryk University 'openorgs____::d169c7407dd417152596908d48c11460', --Masaryk University
'openorgs____::1ec924b1759bb16d0a02f2dad8689b21', --University of Belgrade 'openorgs____::1ec924b1759bb16d0a02f2dad8689b21', --University of Belgrade
'openorgs____::2fb1e47b4612688d9de9169d579939a7', --University of Helsinki 'openorgs____::0ae431b820e4c33db8967fbb2b919150', --University of Helsinki
'openorgs____::759d59f05d77188faee99b7493b46805', --University of Minho 'openorgs____::759d59f05d77188faee99b7493b46805', --University of Minho
'openorgs____::cad284878801b9465fa51a95b1d779db', --Universidad Politécnica de Madrid 'openorgs____::cad284878801b9465fa51a95b1d779db', --Universidad Politécnica de Madrid
'openorgs____::eadc8da90a546e98c03f896661a2e4d4', --University of Göttingen 'openorgs____::eadc8da90a546e98c03f896661a2e4d4', --University of Göttingen
@ -48,7 +48,8 @@ create table TARGET.result stored as parquet as
'openorgs____::4ac562f0376fce3539504567649cb373', -- University of Patras 'openorgs____::4ac562f0376fce3539504567649cb373', -- University of Patras
'openorgs____::3e8d1f8c3f6cd7f418b09f1f58b4873b', -- Aristotle University of Thessaloniki 'openorgs____::3e8d1f8c3f6cd7f418b09f1f58b4873b', -- Aristotle University of Thessaloniki
'openorgs____::3fcef6e1c469c10f2a84b281372c9814', -- World Bank 'openorgs____::3fcef6e1c469c10f2a84b281372c9814', -- World Bank
'openorgs____::1698a2eb1885ef8adb5a4a969e745ad3' -- École des Ponts ParisTech 'openorgs____::1698a2eb1885ef8adb5a4a969e745ad3', -- École des Ponts ParisTech
'openorgs____::e15adb13c4dadd49de4d35c39b5da93a' -- Nanyang Technological University
) )) foo; ) )) foo;
compute stats TARGET.result; compute stats TARGET.result;
@ -154,50 +155,44 @@ create table TARGET.project_results stored as parquet as select id as result, pr
compute stats TARGET.project_results; compute stats TARGET.project_results;
-- indicators -- indicators
-- Sprint 1 ----
create table TARGET.indi_pub_green_oa stored as parquet as select * from SOURCE.indi_pub_green_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_pub_green_oa stored as parquet as select * from SOURCE.indi_pub_green_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_green_oa; compute stats TARGET.indi_pub_green_oa;
create table TARGET.indi_pub_grey_lit stored as parquet as select * from SOURCE.indi_pub_grey_lit orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_pub_grey_lit stored as parquet as select * from SOURCE.indi_pub_grey_lit orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_grey_lit; compute stats TARGET.indi_pub_grey_lit;
create table TARGET.indi_pub_doi_from_crossref stored as parquet as select * from SOURCE.indi_pub_doi_from_crossref orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_pub_doi_from_crossref stored as parquet as select * from SOURCE.indi_pub_doi_from_crossref orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_doi_from_crossref; compute stats TARGET.indi_pub_doi_from_crossref;
create table TARGET.indi_pub_gold_oa stored as parquet as select * from SOURCE.indi_pub_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id); -- Sprint 2 ----
compute stats TARGET.indi_pub_gold_oa;
--create table TARGET.indi_datasets_gold_oa stored as parquet as select * from SOURCE.indi_datasets_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
--compute stats TARGET.indi_datasets_gold_oa;
--create table TARGET.indi_software_gold_oa stored as parquet as select * from SOURCE.indi_software_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
--compute stats TARGET.indi_software_gold_oa;
create table TARGET.indi_pub_has_abstract stored as parquet as select * from SOURCE.indi_pub_has_abstract orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_has_abstract;
create table TARGET.indi_result_has_cc_licence stored as parquet as select * from SOURCE.indi_result_has_cc_licence orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_result_has_cc_licence stored as parquet as select * from SOURCE.indi_result_has_cc_licence orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_result_has_cc_licence; compute stats TARGET.indi_result_has_cc_licence;
create table TARGET.indi_result_has_cc_licence_url stored as parquet as select * from SOURCE.indi_result_has_cc_licence_url orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_result_has_cc_licence_url stored as parquet as select * from SOURCE.indi_result_has_cc_licence_url orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_result_has_cc_licence_url; compute stats TARGET.indi_result_has_cc_licence_url;
create table TARGET.indi_pub_has_abstract stored as parquet as select * from SOURCE.indi_pub_has_abstract orig where exists (select 1 from TARGET.result r where r.id=orig.id);
create view TARGET.indi_funder_country_collab as select * from SOURCE.indi_funder_country_collab; compute stats TARGET.indi_pub_has_abstract;
create view TARGET.indi_project_collab_org as select * from SOURCE.indi_project_collab_org;
create view TARGET.indi_project_collab_org_country as select * from SOURCE.indi_project_collab_org_country;
create table TARGET.indi_result_with_orcid stored as parquet as select * from SOURCE.indi_result_with_orcid orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_result_with_orcid stored as parquet as select * from SOURCE.indi_result_with_orcid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_result_with_orcid; compute stats TARGET.indi_result_with_orcid;
---- Sprint 3 ----
create table TARGET.indi_funded_result_with_fundref stored as parquet as select * from SOURCE.indi_funded_result_with_fundref orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_funded_result_with_fundref stored as parquet as select * from SOURCE.indi_funded_result_with_fundref orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_funded_result_with_fundref; compute stats TARGET.indi_funded_result_with_fundref;
create view TARGET.indi_result_org_collab as select * from SOURCE.indi_result_org_collab;
create view TARGET.indi_result_org_country_collab as select * from SOURCE.indi_result_org_country_collab;
create view TARGET.indi_project_collab_org as select * from SOURCE.indi_project_collab_org;
create view TARGET.indi_project_collab_org_country as select * from SOURCE.indi_project_collab_org_country;
create view TARGET.indi_funder_country_collab as select * from SOURCE.indi_funder_country_collab;
create view TARGET.indi_result_country_collab as select * from SOURCE.indi_result_country_collab;
---- Sprint 4 ----
create table TARGET.indi_pub_diamond stored as parquet as select * from SOURCE.indi_pub_diamond orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_pub_diamond stored as parquet as select * from SOURCE.indi_pub_diamond orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_diamond; compute stats TARGET.indi_pub_diamond;
create table TARGET.indi_pub_hybrid stored as parquet as select * from SOURCE.indi_pub_hybrid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_hybrid;
create table TARGET.indi_pub_in_transformative stored as parquet as select * from SOURCE.indi_pub_in_transformative orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_pub_in_transformative stored as parquet as select * from SOURCE.indi_pub_in_transformative orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_in_transformative; compute stats TARGET.indi_pub_in_transformative;
create table TARGET.indi_pub_closed_other_open stored as parquet as select * from SOURCE.indi_pub_closed_other_open orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_pub_closed_other_open stored as parquet as select * from SOURCE.indi_pub_closed_other_open orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_closed_other_open; compute stats TARGET.indi_pub_closed_other_open;
---- Sprint 5 ----
create table TARGET.indi_result_no_of_copies stored as parquet as select * from SOURCE.indi_result_no_of_copies orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_result_no_of_copies stored as parquet as select * from SOURCE.indi_result_no_of_copies orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_result_no_of_copies; compute stats TARGET.indi_result_no_of_copies;
---- Sprint 6 ----
create view TARGET.indi_org_findable as select * from SOURCE.indi_org_findable;
create view TARGET.indi_org_openess as select * from SOURCE.indi_org_openess;
create table TARGET.indi_pub_hybrid_oa_with_cc stored as parquet as select * from SOURCE.indi_pub_hybrid_oa_with_cc orig where exists (select 1 from TARGET.result r where r.id=orig.id); create table TARGET.indi_pub_hybrid_oa_with_cc stored as parquet as select * from SOURCE.indi_pub_hybrid_oa_with_cc orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_hybrid_oa_with_cc; compute stats TARGET.indi_pub_hybrid_oa_with_cc;
create table TARGET.indi_pub_downloads stored as parquet as select * from SOURCE.indi_pub_downloads orig where exists (select 1 from TARGET.result r where r.id=orig.result_id); create table TARGET.indi_pub_downloads stored as parquet as select * from SOURCE.indi_pub_downloads orig where exists (select 1 from TARGET.result r where r.id=orig.result_id);
compute stats TARGET.indi_pub_downloads; compute stats TARGET.indi_pub_downloads;
create table TARGET.indi_pub_downloads_datasource stored as parquet as select * from SOURCE.indi_pub_downloads_datasource orig where exists (select 1 from TARGET.result r where r.id=orig.result_id); create table TARGET.indi_pub_downloads_datasource stored as parquet as select * from SOURCE.indi_pub_downloads_datasource orig where exists (select 1 from TARGET.result r where r.id=orig.result_id);
@ -206,6 +201,28 @@ create table TARGET.indi_pub_downloads_year stored as parquet as select * from S
compute stats TARGET.indi_pub_downloads_year; compute stats TARGET.indi_pub_downloads_year;
create table TARGET.indi_pub_downloads_datasource_year stored as parquet as select * from SOURCE.indi_pub_downloads_datasource_year orig where exists (select 1 from TARGET.result r where r.id=orig.result_id); create table TARGET.indi_pub_downloads_datasource_year stored as parquet as select * from SOURCE.indi_pub_downloads_datasource_year orig where exists (select 1 from TARGET.result r where r.id=orig.result_id);
compute stats TARGET.indi_pub_downloads_datasource_year; compute stats TARGET.indi_pub_downloads_datasource_year;
---- Sprint 7 ----
create table TARGET.indi_pub_gold_oa stored as parquet as select * from SOURCE.indi_pub_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_gold_oa;
create table TARGET.indi_pub_hybrid stored as parquet as select * from SOURCE.indi_pub_hybrid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
compute stats TARGET.indi_pub_hybrid;
create view TARGET.indi_org_fairness as select * from SOURCE.indi_org_fairness;
create view TARGET.indi_org_fairness_pub_pr as select * from SOURCE.indi_org_fairness_pub_pr;
create view TARGET.indi_org_fairness_pub_year as select * from SOURCE.indi_org_fairness_pub_year;
create view TARGET.indi_org_fairness_pub as select * from SOURCE.indi_org_fairness_pub;
create view TARGET.indi_org_fairness_year as select * from SOURCE.indi_org_fairness_year;
create view TARGET.indi_org_findable_year as select * from SOURCE.indi_org_findable_year;
create view TARGET.indi_org_findable as select * from SOURCE.indi_org_findable;
create view TARGET.indi_org_openess as select * from SOURCE.indi_org_openess;
create view TARGET.indi_org_openess_year as select * from SOURCE.indi_org_openess_year;
create table TARGET.indi_pub_has_preprint stored as parquet as select * from SOURCE.indi_pub_has_preprint orig where exists (select 1 from TARGET.result r where r.id=orig.id);
create table TARGET.indi_pub_in_subscribed stored as parquet as select * from SOURCE.indi_pub_in_subscribed orig where exists (select 1 from TARGET.result r where r.id=orig.id);
create table TARGET.indi_result_with_pid stored as parquet as select * from SOURCE.indi_result_with_pid orig where exists (select 1 from TARGET.result r where r.id=orig.id);
--create table TARGET.indi_datasets_gold_oa stored as parquet as select * from SOURCE.indi_datasets_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
--compute stats TARGET.indi_datasets_gold_oa;
--create table TARGET.indi_software_gold_oa stored as parquet as select * from SOURCE.indi_software_gold_oa orig where exists (select 1 from TARGET.result r where r.id=orig.id);
--compute stats TARGET.indi_software_gold_oa;
--denorm --denorm
alter table TARGET.result rename to TARGET.res_tmp; alter table TARGET.result rename to TARGET.res_tmp;