used the API instead of the IS for bulktagging and propagation for community through organization. Added a new propagation step for communities through projects. Still using the API and not the IS

This commit is contained in:
Miriam Baglioni 2023-10-11 18:17:35 +02:00
parent a3d01ccb24
commit 89184d5b4f
70 changed files with 622 additions and 406 deletions

View File

@ -71,6 +71,9 @@ public class PropagationConstant {
public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID = "result:community:organization"; public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID = "result:community:organization";
public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME = " Propagation of result belonging to community through organization"; public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME = " Propagation of result belonging to community through organization";
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID = "result:community:project";
public static final String PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME = " Propagation of result belonging to community through project";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result"; public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations"; public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";

View File

@ -4,22 +4,18 @@ package eu.dnetlib.dhp.api;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import org.apache.http.HttpHeaders;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import com.google.gson.Gson;
/** /**
* @author miriam.baglioni * @author miriam.baglioni
* @Date 06/10/23 * @Date 06/10/23
*/ */
public class QueryCommunityAPI { public class QueryCommunityAPI {
private static final String baseUrl = "https://services.openaire.eu/openaire/"; private static final String PRODUCTION_BASE_URL = "https://services.openaire.eu/openaire/";
private static final String BETA_BASE_URL = "https://beta.services.openaire.eu/openaire/";
private static String get(String geturl) throws IOException { private static String get(String geturl) throws IOException {
URL url = new URL(geturl); URL url = new URL(geturl);
@ -36,25 +32,35 @@ public class QueryCommunityAPI {
return body; return body;
} }
public static String communities() throws IOException { public static String communities(boolean production) throws IOException {
return get(baseUrl + "community/communities"); if (production)
return get(PRODUCTION_BASE_URL + "community/communities");
return get(BETA_BASE_URL + "community/communities");
} }
public static String community(String id) throws IOException { public static String community(String id, boolean production) throws IOException {
return get(baseUrl + "community/" + id); if (production)
return get(PRODUCTION_BASE_URL + "community/" + id);
return get(BETA_BASE_URL + "community/" + id);
} }
public static String communityDatasource(String id) throws IOException { public static String communityDatasource(String id, boolean production) throws IOException {
return get(baseUrl + "community/" + id + "/contentproviders"); if (production)
return get(PRODUCTION_BASE_URL + "community/" + id + "/contentproviders");
return (BETA_BASE_URL + "community/" + id + "/contentproviders");
} }
public static String communityPropagationOrganization(String id) throws IOException { public static String communityPropagationOrganization(String id, boolean production) throws IOException {
return get(baseUrl + "community/" + id + "/propagationOrganizations"); if (production)
return get(PRODUCTION_BASE_URL + "community/" + id + "/propagationOrganizations");
return get(BETA_BASE_URL + "community/" + id + "/propagationOrganizations");
} }
public static String communityProjects(String id, String page, String size) throws IOException { public static String communityProjects(String id, String page, String size, boolean production) throws IOException {
return get(baseUrl + "community/" + id + "/projects/" + page + "/" + size); if (production)
return get(PRODUCTION_BASE_URL + "community/" + id + "/projects/" + page + "/" + size);
return get(BETA_BASE_URL + "community/" + id + "/projects/" + page + "/" + size);
} }
@NotNull @NotNull

View File

@ -30,14 +30,14 @@ public class Utils implements Serializable {
private static final ObjectMapper MAPPER = new ObjectMapper(); private static final ObjectMapper MAPPER = new ObjectMapper();
private static final VerbResolver resolver = VerbResolverFactory.newInstance(); private static final VerbResolver resolver = VerbResolverFactory.newInstance();
public static CommunityConfiguration getCommunityConfiguration() throws IOException { public static CommunityConfiguration getCommunityConfiguration(boolean production) throws IOException {
final Map<String, Community> communities = Maps.newHashMap(); final Map<String, Community> communities = Maps.newHashMap();
List<Community> validCommunities = new ArrayList<>(); List<Community> validCommunities = new ArrayList<>();
getValidCommunities() getValidCommunities(production)
.forEach(community -> { .forEach(community -> {
try { try {
CommunityModel cm = MAPPER CommunityModel cm = MAPPER
.readValue(QueryCommunityAPI.community(community.getId()), CommunityModel.class); .readValue(QueryCommunityAPI.community(community.getId(), production), CommunityModel.class);
validCommunities.add(getCommunity(cm)); validCommunities.add(getCommunity(cm));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -46,10 +46,11 @@ public class Utils implements Serializable {
validCommunities.forEach(community -> { validCommunities.forEach(community -> {
try { try {
DatasourceList dl = MAPPER DatasourceList dl = MAPPER
.readValue(QueryCommunityAPI.communityDatasource(community.getId()), DatasourceList.class); .readValue(
QueryCommunityAPI.communityDatasource(community.getId(), production), DatasourceList.class);
community.setProviders(dl.stream().map(d -> { community.setProviders(dl.stream().map(d -> {
// if(d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
// return null; return null;
Provider p = new Provider(); Provider p = new Provider();
p.setOpenaireId("10|" + d.getOpenaireId()); p.setOpenaireId("10|" + d.getOpenaireId());
p.setSelectionConstraints(d.getSelectioncriteria()); p.setSelectionConstraints(d.getSelectioncriteria());
@ -80,18 +81,20 @@ public class Utils implements Serializable {
c.setSubjects(cm.getSubjects()); c.setSubjects(cm.getSubjects());
c.getSubjects().addAll(cm.getFos()); c.getSubjects().addAll(cm.getFos());
c.getSubjects().addAll(cm.getSdg()); c.getSubjects().addAll(cm.getSdg());
c.setConstraints(cm.getAdvancedConstraints()); if (cm.getAdvancedConstraints() != null) {
if (c.getConstraints() != null) c.setConstraints(cm.getAdvancedConstraints());
c.getConstraints().setSelection(resolver); c.getConstraints().setSelection(resolver);
c.setRemoveConstraints(cm.getRemoveConstraints()); }
if (c.getRemoveConstraints() != null) if (cm.getRemoveConstraints() != null) {
c.setRemoveConstraints(cm.getRemoveConstraints());
c.getRemoveConstraints().setSelection(resolver); c.getRemoveConstraints().setSelection(resolver);
}
return c; return c;
} }
public static List<CommunityModel> getValidCommunities() throws IOException { public static List<CommunityModel> getValidCommunities(boolean production) throws IOException {
return MAPPER return MAPPER
.readValue(QueryCommunityAPI.communities(), CommunitySummary.class) .readValue(QueryCommunityAPI.communities(production), CommunitySummary.class)
.stream() .stream()
.filter( .filter(
community -> !community.getStatus().equals("hidden") && community -> !community.getStatus().equals("hidden") &&
@ -99,17 +102,26 @@ public class Utils implements Serializable {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public static CommunityEntityMap getCommunityOrganization() throws IOException { /**
* it returns for each organization the list of associated communities
*/
public static CommunityEntityMap getCommunityOrganization(boolean production) throws IOException {
CommunityEntityMap organizationMap = new CommunityEntityMap(); CommunityEntityMap organizationMap = new CommunityEntityMap();
getValidCommunities() getValidCommunities(production)
.forEach(community -> { .forEach(community -> {
String id = community.getId(); String id = community.getId();
try { try {
List<String> associatedOrgs = MAPPER List<String> associatedOrgs = MAPPER
.readValue(QueryCommunityAPI.communityPropagationOrganization(id), OrganizationList.class); .readValue(
if (associatedOrgs.size() > 0) { QueryCommunityAPI.communityPropagationOrganization(id, production), OrganizationList.class);
organizationMap.put(id, associatedOrgs); associatedOrgs.forEach(o -> {
} if (!organizationMap
.keySet()
.contains(
"20|" + o))
organizationMap.put("20|" + o, new ArrayList<>());
organizationMap.get("20|" + o).add(community.getId());
});
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -117,26 +129,28 @@ public class Utils implements Serializable {
return organizationMap; return organizationMap;
} }
public static CommunityEntityMap getCommunityProjects() throws IOException { public static CommunityEntityMap getCommunityProjects(boolean production) throws IOException {
CommunityEntityMap projectMap = new CommunityEntityMap(); CommunityEntityMap projectMap = new CommunityEntityMap();
getValidCommunities() getValidCommunities(production)
.forEach(community -> { .forEach(community -> {
int page = -1; int page = -1;
int size = 100; int size = 100;
ContentModel cm = new ContentModel(); ContentModel cm = new ContentModel();
List<String> projectList = new ArrayList<>();
do { do {
page++; page++;
try { try {
cm = MAPPER cm = MAPPER
.readValue( .readValue(
QueryCommunityAPI QueryCommunityAPI
.communityProjects(community.getId(), String.valueOf(page), String.valueOf(size)), .communityProjects(
community.getId(), String.valueOf(page), String.valueOf(size), production),
ContentModel.class); ContentModel.class);
if (cm.getContent().size() > 0) { if (cm.getContent().size() > 0) {
cm.getContent().forEach(p -> {
cm.getContent().forEach(p -> projectList.add("40|" + p.getOpenaireId())); if (!projectMap.keySet().contains("40|" + p.getOpenaireId()))
projectMap.put(community.getId(), projectList); projectMap.put("40|" + p.getOpenaireId(), new ArrayList<>());
projectMap.get("40|" + p.getOpenaireId()).add(community.getId());
});
} }
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -23,6 +23,8 @@ import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.*; import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
@ -53,50 +55,38 @@ public class SparkBulkTagJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Boolean isTest = Optional
.ofNullable(parser.get("isTest"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
log.info("isTest: {} ", isTest);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final boolean production = Boolean.valueOf(parser.get("production"));
log.info("production: {}", production);
ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class); ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class);
log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); log.info("pathMap: {}", new Gson().toJson(protoMappingParams));
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
CommunityConfiguration cc; CommunityConfiguration cc;
String taggingConf = parser.get("taggingConf"); String taggingConf = Optional
.ofNullable(parser.get("taggingConf"))
.map(String::valueOf)
.orElse(null);
if (isTest) { if (taggingConf != null) {
cc = CommunityConfigurationFactory.newInstance(taggingConf); cc = CommunityConfigurationFactory.newInstance(taggingConf);
} else { } else {
cc = Utils.getCommunityConfiguration();// QueryInformationSystem.getCommunityConfiguration(parser.get("isLookUpUrl")); cc = Utils.getCommunityConfiguration(production);
} }
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath);
extendCommunityConfigurationForEOSC(spark, inputPath, cc); extendCommunityConfigurationForEOSC(spark, inputPath, cc);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc); execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc);
}); });
} }
@ -141,22 +131,30 @@ public class SparkBulkTagJob {
String inputPath, String inputPath,
String outputPath, String outputPath,
ProtoMap protoMappingParams, ProtoMap protoMappingParams,
Class<R> resultClazz,
CommunityConfiguration communityConfiguration) { CommunityConfiguration communityConfiguration) {
ResultTagger resultTagger = new ResultTagger(); ModelSupport.entityTypes
readPath(spark, inputPath, resultClazz) .keySet()
.map(patchResult(), Encoders.bean(resultClazz)) .parallelStream()
.filter(Objects::nonNull) .filter(e -> ModelSupport.isResult(e))
.map( .forEach(e -> {
(MapFunction<R, R>) value -> resultTagger removeOutputDir(spark, outputPath + e.name());
.enrichContextCriteria( ResultTagger resultTagger = new ResultTagger();
value, communityConfiguration, protoMappingParams), Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Encoders.bean(resultClazz)) readPath(spark, inputPath + e.name(), resultClazz)
.write() .map(patchResult(), Encoders.bean(resultClazz))
.mode(SaveMode.Overwrite) .filter(Objects::nonNull)
.option("compression", "gzip") .map(
.json(outputPath); (MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria(
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
});
} }
public static <R> Dataset<R> readPath( public static <R> Dataset<R> readPath(

View File

@ -82,19 +82,23 @@ public class ResultTagger implements Serializable {
// communities contains all the communities to be not added to the context // communities contains all the communities to be not added to the context
final Set<String> removeCommunities = new HashSet<>(); final Set<String> removeCommunities = new HashSet<>();
// if (conf.getRemoveConstraintsMap().keySet().size() > 0)
conf conf
.getRemoveConstraintsMap() .getRemoveConstraintsMap()
.keySet() .keySet()
.forEach(communityId -> { .forEach(
if (conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null && communityId -> {
conf // log.info("Remove constraints for " + communityId);
.getRemoveConstraintsMap() if (conf.getRemoveConstraintsMap().keySet().contains(communityId) &&
.get(communityId) conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
.getCriteria() conf
.stream() .getRemoveConstraintsMap()
.anyMatch(crit -> crit.verifyCriteria(param))) .get(communityId)
removeCommunities.add(communityId); .getCriteria()
}); .stream()
.anyMatch(crit -> crit.verifyCriteria(param)))
removeCommunities.add(communityId);
});
// communities contains all the communities to be added as context for the result // communities contains all the communities to be added as context for the result
final Set<String> communities = new HashSet<>(); final Set<String> communities = new HashSet<>();
@ -124,10 +128,10 @@ public class ResultTagger implements Serializable {
if (Objects.nonNull(result.getInstance())) { if (Objects.nonNull(result.getInstance())) {
for (Instance i : result.getInstance()) { for (Instance i : result.getInstance()) {
if (Objects.nonNull(i.getCollectedfrom()) && Objects.nonNull(i.getCollectedfrom().getKey())) { if (Objects.nonNull(i.getCollectedfrom()) && Objects.nonNull(i.getCollectedfrom().getKey())) {
collfrom.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|")); collfrom.add(i.getCollectedfrom().getKey());
} }
if (Objects.nonNull(i.getHostedby()) && Objects.nonNull(i.getHostedby().getKey())) { if (Objects.nonNull(i.getHostedby()) && Objects.nonNull(i.getHostedby().getKey())) {
hostdby.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|")); hostdby.add(i.getHostedby().getKey());
} }
} }

View File

@ -48,11 +48,10 @@ public class PrepareResultCommunitySet {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
// final CommunityEntityMap organizationMap = new Gson() final boolean production = Boolean.valueOf(parser.get("production"));
// .fromJson( log.info("production: {}", production);
// parser.get("organizationtoresultcommunitymap"),
// CommunityEntityMap.class); final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(production);
final CommunityEntityMap organizationMap = Utils.getCommunityOrganization();
log.info("organizationMap: {}", new Gson().toJson(organizationMap)); log.info("organizationMap: {}", new Gson().toJson(organizationMap));
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();

View File

@ -56,12 +56,6 @@ public class SparkResultToCommunityFromOrganizationJob {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
@ -73,9 +67,9 @@ public class SparkResultToCommunityFromOrganizationJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
if (saveGraph) {
execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath); execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
}
}); });
} }

View File

@ -0,0 +1,121 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultOrganizations;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareResultCommunitySet {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySet.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultCommunitySet.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_preparecommunitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final boolean production = Boolean.valueOf(parser.get("outputPath"));
log.info("production: {}", production);
final CommunityEntityMap projectsMap = Utils.getCommunityProjects(production);
log.info("projectsMap: {}", new Gson().toJson(projectsMap));
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareInfo(spark, inputPath, outputPath, projectsMap);
});
}
private static void prepareInfo(
SparkSession spark,
String inputPath,
String outputPath,
CommunityEntityMap projectMap) {
final StructType structureSchema = new StructType()
.add(
"dataInfo", new StructType()
.add("deletedbyinference", DataTypes.BooleanType)
.add("invisible", DataTypes.BooleanType))
.add("source", DataTypes.StringType)
.add("target", DataTypes.StringType)
.add("relClass", DataTypes.StringType);
spark
.read()
.schema(structureSchema)
.json(inputPath)
.filter(
"dataInfo.deletedbyinference != true " +
"and relClass == '" + ModelConstants.IS_PRODUCED_BY + "'")
.select(
new Column("source").as("resultId"),
new Column("target").as("projectId"))
.groupByKey((MapFunction<Row, String>) r -> (String) r.getAs("source"), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Row, ResultProjectList>) (k, v) -> {
ResultProjectList rpl = new ResultProjectList();
rpl.setResultId(k);
ArrayList<String> cl = new ArrayList<>();
cl.addAll(projectMap.get(v.next().getAs("target")));
v.forEachRemaining(r -> {
projectMap
.get(r.getAs("target"))
.forEach(c -> {
if (!cl.contains(c))
cl.add(c);
});
});
rpl.setCommunityList(cl);
return rpl;
}, Encoders.bean(ResultProjectList.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -0,0 +1,26 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import java.io.Serializable;
import java.util.ArrayList;
public class ResultProjectList implements Serializable {
private String resultId;
private ArrayList<String> communityList;
public String getResultId() {
return resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public ArrayList<String> getCommunityList() {
return communityList;
}
public void setCommunityList(ArrayList<String> communityList) {
this.communityList = communityList;
}
}

View File

@ -0,0 +1,156 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 11/10/23
*/
public class SparkResultToCommunityFromProject implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromProject.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToCommunityFromProject.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_communitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String possibleupdatespath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", possibleupdatespath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
@SuppressWarnings("unchecked")
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
// removeOutputDir(spark, outputPath);
execPropagation(spark, inputPath, outputPath, possibleupdatespath);
});
}
private static <R extends Result> void execPropagation(
SparkSession spark,
String inputPath,
String outputPath,
String possibleUpdatesPath) {
Dataset<ResultProjectList> possibleUpdates = readPath(spark, possibleUpdatesPath, ResultProjectList.class);
ModelSupport.entityTypes
.keySet()
.parallelStream()
.forEach(e -> {
if (ModelSupport.isResult(e)) {
removeOutputDir(spark, outputPath + e.name());
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
});
}
private static <R extends Result> MapFunction<Tuple2<R, ResultProjectList>, R> resultCommunityFn() {
return value -> {
R ret = value._1();
Optional<ResultProjectList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
ArrayList<String> communitySet = rcl.get().getCommunityList();
List<String> contextList = ret
.getContext()
.stream()
.map(Context::getId)
.collect(Collectors.toList());
@SuppressWarnings("unchecked")
R res = (R) ret.getClass().newInstance();
res.setId(ret.getId());
List<Context> propagatedContexts = new ArrayList<>();
for (String cId : communitySet) {
if (!contextList.contains(cId)) {
Context newContext = new Context();
newContext.setId(cId);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
propagatedContexts.add(newContext);
}
}
res.setContext(propagatedContexts);
ret.mergeFrom(res);
}
return ret;
};
}
}

View File

@ -1,10 +1,5 @@
[ [
{
"paramName":"is",
"paramLongName":"isLookUpUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{ {
"paramName":"s", "paramName":"s",
"paramLongName":"sourcePath", "paramLongName":"sourcePath",
@ -17,12 +12,7 @@
"paramDescription": "the json path associated to each selection field", "paramDescription": "the json path associated to each selection field",
"paramRequired": true "paramRequired": true
}, },
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{ {
"paramName": "out", "paramName": "out",
"paramLongName": "outputPath", "paramLongName": "outputPath",
@ -35,17 +25,19 @@
"paramDescription": "true if the spark session is managed, false otherwise", "paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false "paramRequired": false
}, },
{
"paramName": "test",
"paramLongName": "isTest",
"paramDescription": "Parameter intended for testing purposes only. True if the reun is relatesd to a test and so the taggingConf parameter should be loaded",
"paramRequired": false
},
{ {
"paramName": "tg", "paramName": "tg",
"paramLongName": "taggingConf", "paramLongName": "taggingConf",
"paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed", "paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed",
"paramRequired": false "paramRequired": false
},
{
"paramName": "p",
"paramLongName": "production",
"paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed",
"paramRequired": true
} }
] ]

View File

@ -4,10 +4,6 @@
<name>sourcePath</name> <name>sourcePath</name>
<description>the source path</description> <description>the source path</description>
</property> </property>
<property>
<name>isLookUpUrl</name>
<description>the isLookup service endpoint</description>
</property>
<property> <property>
<name>pathMap</name> <name>pathMap</name>
<description>the json path associated to each selection field</description> <description>the json path associated to each selection field</description>
@ -44,7 +40,7 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="exec_bulktag"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -102,16 +98,9 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="copy_wait" to="fork_exec_bulktag"/> <join name="copy_wait" to="exec_bulktag"/>
<fork name="fork_exec_bulktag"> <action name="exec_bulktag">
<path start="bulktag_publication"/>
<path start="bulktag_dataset"/>
<path start="bulktag_otherresearchproduct"/>
<path start="bulktag_software"/>
</fork>
<action name="bulktag_publication">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
@ -128,98 +117,15 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--outputPath</arg><arg>${outputPath}/</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg> <arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg> <arg>--production</arg><arg>${production}</arg>
</spark> </spark>
<ok to="wait"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="bulktag_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-dataset</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="bulktag_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-orp</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="bulktag_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-software</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="End"/>
<end name="End"/> <end name="End"/>

View File

@ -1,62 +0,0 @@
for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')
let $subj := $x//CONFIGURATION/context/param[./@name='subject']/text()
let $datasources := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::contentproviders')]/concept
let $organizations := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::resultorganizations')]/concept
let $communities := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::zenodocommunities')]/concept
let $fos := $x//CONFIGURATION/context/param[./@name='fos']/text()
let $sdg := $x//CONFIGURATION/context/param[./@name='sdg']/text()
let $zenodo := $x//param[./@name='zenodoCommunity']/text()
where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] and $x//context/param[./@name = 'status']/text() != 'hidden'
return
<community>
{ $x//CONFIGURATION/context/@id}
<removeConstraints>
{$x//CONFIGURATION/context/param[./@name='removeConstraints']/text() }
</removeConstraints>
<advancedConstraints>
{$x//CONFIGURATION/context/param[./@name='advancedConstraints']/text() }
</advancedConstraints>
<subjects>
{for $y in tokenize($subj,',')
return
<subject>{$y}</subject>}
{for $y in tokenize($fos,',')
return
<subject>{$y}</subject>}
{for $y in tokenize($sdg,',')
return
<subject>{$y}</subject>}
</subjects>
<datasources>
{for $d in $datasources
where $d/param[./@name='enabled']/text()='true'
return
<datasource>
<openaireId>
{$d//param[./@name='openaireId']/text()}
</openaireId>
<selcriteria>
{$d/param[./@name='selcriteria']/text()}
</selcriteria>
</datasource> }
</datasources>
<zenodocommunities>
{for $zc in $zenodo
return
<zenodocommunity>
<zenodoid>
{$zc}
</zenodoid>
</zenodocommunity>}
{for $zc in $communities
return
<zenodocommunity>
<zenodoid>
{$zc/param[./@name='zenodoid']/text()}
</zenodoid>
<selcriteria>
{$zc/param[./@name='selcriteria']/text()}
</selcriteria>
</zenodocommunity>}
</zenodocommunities>
</community>

View File

@ -11,18 +11,6 @@
"paramDescription": "the hive metastore uris", "paramDescription": "the hive metastore uris",
"paramRequired": true "paramRequired": true
}, },
{
"paramName":"sg",
"paramLongName":"saveGraph",
"paramDescription": "true if the new version of the graph must be saved",
"paramRequired": false
},
{
"paramName":"test",
"paramLongName":"isTest",
"paramDescription": "true if it is executing a test",
"paramRequired": false
},
{ {
"paramName": "out", "paramName": "out",
"paramLongName": "outputPath", "paramLongName": "outputPath",

View File

@ -4,10 +4,7 @@
<name>sourcePath</name> <name>sourcePath</name>
<description>the source path</description> <description>the source path</description>
</property> </property>
<!-- <property>-->
<!-- <name>organizationtoresultcommunitymap</name>-->
<!-- <description>organization community map</description>-->
<!-- </property>-->
<property> <property>
<name>outputPath</name> <name>outputPath</name>
<description>the output path</description> <description>the output path</description>
@ -106,7 +103,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg> <arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg> <arg>--outputPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--organizationtoresultcommunitymap</arg><arg>${organizationtoresultcommunitymap}</arg>
</spark> </spark>
<ok to="fork-join-exec-propagation"/> <ok to="fork-join-exec-propagation"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.HashMap;
import java.util.List; import java.util.List;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -98,14 +99,11 @@ public class BulkTagJobTest {
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", "-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates").getPath(), getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(),
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", "-outputPath", workingDir.toString() + "/",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -133,19 +131,16 @@ public class BulkTagJobTest {
@Test @Test
void bulktagBySubjectNoPreviousContextTest() throws Exception { void bulktagBySubjectNoPreviousContextTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext") .getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext/")
.getPath(); .getPath();
final String pathMap = BulkTagJobTest.pathMap; final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", "-outputPath", workingDir.toString() + "/",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -230,19 +225,19 @@ public class BulkTagJobTest {
void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception { void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance") "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/")
.getPath(); .getPath();
final String pathMap = BulkTagJobTest.pathMap; final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -311,18 +306,18 @@ public class BulkTagJobTest {
@Test @Test
void bulktagByDatasourceTest() throws Exception { void bulktagByDatasourceTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource") .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/publication", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -384,25 +379,25 @@ public class BulkTagJobTest {
void bulktagByZenodoCommunityTest() throws Exception { void bulktagByZenodoCommunityTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity") "/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/orp", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<OtherResearchProduct> tmp = sc JavaRDD<OtherResearchProduct> tmp = sc
.textFile(workingDir.toString() + "/orp") .textFile(workingDir.toString() + "/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)); .map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
Assertions.assertEquals(10, tmp.count()); Assertions.assertEquals(10, tmp.count());
@ -505,18 +500,18 @@ public class BulkTagJobTest {
@Test @Test
void bulktagBySubjectDatasourceTest() throws Exception { void bulktagBySubjectDatasourceTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource") .getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -636,14 +631,14 @@ public class BulkTagJobTest {
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", "-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/software_10.json.gz").getPath(), getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/").getPath(),
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -732,18 +727,18 @@ public class BulkTagJobTest {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints") "/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -774,19 +769,19 @@ public class BulkTagJobTest {
void bulkTagOtherJupyter() throws Exception { void bulkTagOtherJupyter() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct") "/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -829,18 +824,18 @@ public class BulkTagJobTest {
public void bulkTagDatasetJupyter() throws Exception { public void bulkTagDatasetJupyter() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/dataset") "/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -878,18 +873,18 @@ public class BulkTagJobTest {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/software") "/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -1096,18 +1091,18 @@ public class BulkTagJobTest {
void galaxyOtherTest() throws Exception { void galaxyOtherTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct") "/eu/dnetlib/dhp/eosctag/galaxy/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -1214,18 +1209,18 @@ public class BulkTagJobTest {
void galaxySoftwareTest() throws Exception { void galaxySoftwareTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/eosctag/galaxy/software") "/eu/dnetlib/dhp/eosctag/galaxy/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -1333,19 +1328,19 @@ public class BulkTagJobTest {
void twitterDatasetTest() throws Exception { void twitterDatasetTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/eosctag/twitter/dataset") "/eu/dnetlib/dhp/eosctag/twitter/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -1373,19 +1368,19 @@ public class BulkTagJobTest {
void twitterOtherTest() throws Exception { void twitterOtherTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/eosctag/twitter/otherresearchproduct") "/eu/dnetlib/dhp/eosctag/twitter/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -1418,19 +1413,19 @@ public class BulkTagJobTest {
void twitterSoftwareTest() throws Exception { void twitterSoftwareTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/eosctag/twitter/software") "/eu/dnetlib/dhp/eosctag/twitter/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -1455,19 +1450,19 @@ public class BulkTagJobTest {
void EoscContextTagTest() throws Exception { void EoscContextTagTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json") "/eu/dnetlib/dhp/bulktag/eosc/dataset/")
.getPath(); .getPath();
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
@ -1533,16 +1528,16 @@ public class BulkTagJobTest {
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", "-sourcePath",
getClass() getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints") .getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints/")
.getPath(), .getPath(),
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap "-pathMap", pathMap
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1574,14 +1569,14 @@ public class BulkTagJobTest {
SparkBulkTagJob SparkBulkTagJob
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", "-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates").getPath(), getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(),
"-taggingConf", taggingConf, "-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL, "-production", Boolean.TRUE.toString(),
"-pathMap", pathMap "-pathMap", pathMap
}); });

View File

@ -26,7 +26,7 @@ public class QueryCommunityAPITest {
@Test @Test
void communityList() throws Exception { void communityList() throws Exception {
String body = QueryCommunityAPI.communities(); String body = QueryCommunityAPI.communities(true);
new ObjectMapper() new ObjectMapper()
.readValue(body, CommunitySummary.class) .readValue(body, CommunitySummary.class)
.forEach(p -> { .forEach(p -> {
@ -41,7 +41,7 @@ public class QueryCommunityAPITest {
@Test @Test
void community() throws Exception { void community() throws Exception {
String id = "dh-ch"; String id = "dh-ch";
String body = QueryCommunityAPI.community(id); String body = QueryCommunityAPI.community(id, true);
System.out System.out
.println( .println(
new ObjectMapper() new ObjectMapper()
@ -53,7 +53,7 @@ public class QueryCommunityAPITest {
@Test @Test
void communityDatasource() throws Exception { void communityDatasource() throws Exception {
String id = "dh-ch"; String id = "dh-ch";
String body = QueryCommunityAPI.communityDatasource(id); String body = QueryCommunityAPI.communityDatasource(id, true);
new ObjectMapper() new ObjectMapper()
.readValue(body, DatasourceList.class) .readValue(body, DatasourceList.class)
.forEach(ds -> { .forEach(ds -> {
@ -68,7 +68,7 @@ public class QueryCommunityAPITest {
@Test @Test
void validCommunities() throws Exception { void validCommunities() throws Exception {
CommunityConfiguration cc = Utils.getCommunityConfiguration(); CommunityConfiguration cc = Utils.getCommunityConfiguration(true);
System.out.println(cc.getCommunities().keySet()); System.out.println(cc.getCommunities().keySet());
Community community = cc.getCommunities().get("aurora"); Community community = cc.getCommunities().get("aurora");
Assertions.assertEquals(0, community.getSubjects().size()); Assertions.assertEquals(0, community.getSubjects().size());
@ -84,11 +84,20 @@ public class QueryCommunityAPITest {
Assertions Assertions
.assertEquals( .assertEquals(
35, community.getProviders().stream().filter(p -> p.getSelectionConstraints() == null).count()); 35, community.getProviders().stream().filter(p -> p.getSelectionConstraints() == null).count());
}
@Test
void eutopiaCommunityConfiguration() throws Exception {
CommunityConfiguration cc = Utils.getCommunityConfiguration(true);
System.out.println(cc.getCommunities().keySet());
Community community = cc.getCommunities().get("eutopia");
community.getProviders().forEach(p -> System.out.println(p.getOpenaireId()));
} }
@Test @Test
void getCommunityProjects() throws Exception { void getCommunityProjects() throws Exception {
CommunityEntityMap projectMap = Utils.getCommunityProjects(); CommunityEntityMap projectMap = Utils.getCommunityProjects(true);
Assertions.assertFalse(projectMap.containsKey("mes")); Assertions.assertFalse(projectMap.containsKey("mes"));
Assertions.assertEquals(33, projectMap.size()); Assertions.assertEquals(33, projectMap.size());
Assertions Assertions

View File

@ -78,7 +78,7 @@ public class ResultToCommunityJobTest {
.getResource("/eu/dnetlib/dhp/resulttocommunityfromorganization/sample") .getResource("/eu/dnetlib/dhp/resulttocommunityfromorganization/sample")
.getPath(), .getPath(),
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-saveGraph", "true",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/dataset",
"-preparedInfoPath", preparedInfoPath "-preparedInfoPath", preparedInfoPath

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long