Compare commits

...

15 Commits

Author SHA1 Message Date
Miriam Baglioni 5c5a195e97 refactoring and fixing issue on property name 2023-10-23 11:26:17 +02:00
Miriam Baglioni 70b78a40c7 removed file from different propagation 2023-10-20 15:50:49 +02:00
Miriam Baglioni f206ff42d6 modified code to use the the API. Removing not needed parameters. Rewritten the code to exploit the parallel stream on the entity types 2023-10-20 15:49:41 +02:00
Miriam Baglioni 34358afe75 modified resource file, workflow anf default-config. Add 3g of memory Overhead and specified the shuffle partition in the wf confiduration. Removed the multiple instantiation in the wf because of different implementation of the spark job 2023-10-20 15:48:27 +02:00
Miriam Baglioni 18bfff8af3 adding test classes and modifying test for bulktag 2023-10-20 15:47:03 +02:00
Miriam Baglioni 69dac91659 adding the new code to use the API instead of the Information Service 2023-10-20 15:45:52 +02:00
Miriam Baglioni a9ede1e989 Merge branch 'master' of https://code-repo.d4science.org/D-Net/dnet-hadoop 2023-10-20 10:14:43 +02:00
Miriam Baglioni 599828ce35 Merge branch 'master' of https://code-repo.d4science.org/D-Net/dnet-hadoop 2023-08-09 13:07:13 +02:00
Miriam Baglioni 9e8e39f78a - 2023-07-19 11:35:58 +02:00
Miriam Baglioni 8621377917 [UsageCount] fixed typo in attribute name for datasource table 2023-06-30 19:02:44 +02:00
Miriam Baglioni ef2dd7a980 resolved conflicts 2023-06-30 18:59:47 +02:00
Miriam Baglioni e4b27182d0 [master] refactoring 2023-06-21 11:15:53 +02:00
Miriam Baglioni d9506035e4 [ZenodoApi] gone back to okhttp3 to send the payload. 2023-06-09 12:05:02 +02:00
Miriam Baglioni b64a5eb4a5 Merge branch 'master' of https://code-repo.d4science.org/D-Net/dnet-hadoop 2023-05-24 15:21:58 +02:00
Miriam Baglioni 9fc8ebe98b refactoring 2023-04-19 09:32:13 +02:00
25 changed files with 906 additions and 333 deletions

View File

@ -15,7 +15,7 @@ import org.junit.jupiter.api.Test;
class ZenodoAPIClientTest {
private final String URL_STRING = "https://sandbox.zenodo.org/api/deposit/depositions";
private final String ACCESS_TOKEN = "";
private final String ACCESS_TOKEN = "OzzOsyucEIHxCEfhlpsMo3myEiwpCza3trCRL7ddfGTAK9xXkIP2MbXd6Vg4";
private final String CONCEPT_REC_ID = "657113";
@ -51,18 +51,18 @@ class ZenodoAPIClientTest {
Assertions.assertEquals(201, client.newDeposition());
File file = new File(getClass()
.getResource("/eu/dnetlib/dhp/common/api/COVID-19.json.gz")
.getResource("/eu/dnetlib/dhp/common/api/newVersion")
.getPath());
InputStream is = new FileInputStream(file);
Assertions.assertEquals(200, client.uploadIS(is, "COVID-19.json.gz"));
// Assertions.assertEquals(200, client.uploadIS(is, "COVID-19.json.gz"));
String metadata = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/common/api/metadata.json"));
Assertions.assertEquals(200, client.sendMretadata(metadata));
Assertions.assertEquals(202, client.publish());
// Assertions.assertEquals(202, client.publish());
}
@ -106,4 +106,23 @@ class ZenodoAPIClientTest {
}
@Test
void depositBigFile() throws MissingConceptDoiException, IOException {
ZenodoAPIClient client = new ZenodoAPIClient(URL_STRING,
ACCESS_TOKEN);
Assertions.assertEquals(201, client.newDeposition());
File file = new File("/Users/miriam.baglioni/Desktop/EOSC_DUMP/publication.tar");
// File file = new File(getClass()
// .getResource("/eu/dnetlib/dhp/common/api/newVersion2")
// .getPath());
InputStream is = new FileInputStream(file);
Assertions.assertEquals(200, client.uploadIS(is, "newVersion_deposition"));
// Assertions.assertEquals(202, client.publish());
}
}

View File

@ -0,0 +1,83 @@
package eu.dnetlib.dhp.api;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import org.jetbrains.annotations.NotNull;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
public class QueryCommunityAPI {
private static final String PRODUCTION_BASE_URL = "https://services.openaire.eu/openaire/";
private static final String BETA_BASE_URL = "https://beta.services.openaire.eu/openaire/";
private static String get(String geturl) throws IOException {
URL url = new URL(geturl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setDoOutput(true);
conn.setRequestMethod("GET");
int responseCode = conn.getResponseCode();
String body = getBody(conn);
conn.disconnect();
if (responseCode != HttpURLConnection.HTTP_OK)
throw new IOException("Unexpected code " + responseCode + body);
return body;
}
public static String communities(boolean production) throws IOException {
if (production)
return get(PRODUCTION_BASE_URL + "community/communities");
return get(BETA_BASE_URL + "community/communities");
}
public static String community(String id, boolean production) throws IOException {
if (production)
return get(PRODUCTION_BASE_URL + "community/" + id);
return get(BETA_BASE_URL + "community/" + id);
}
public static String communityDatasource(String id, boolean production) throws IOException {
if (production)
return get(PRODUCTION_BASE_URL + "community/" + id + "/contentproviders");
return (BETA_BASE_URL + "community/" + id + "/contentproviders");
}
public static String communityPropagationOrganization(String id, boolean production) throws IOException {
if (production)
return get(PRODUCTION_BASE_URL + "community/" + id + "/propagationOrganizations");
return get(BETA_BASE_URL + "community/" + id + "/propagationOrganizations");
}
public static String communityProjects(String id, String page, String size, boolean production) throws IOException {
if (production)
return get(PRODUCTION_BASE_URL + "community/" + id + "/projects/" + page + "/" + size);
return get(BETA_BASE_URL + "community/" + id + "/projects/" + page + "/" + size);
}
@NotNull
private static String getBody(HttpURLConnection conn) throws IOException {
String body = "{}";
try (BufferedReader br = new BufferedReader(
new InputStreamReader(conn.getInputStream(), "utf-8"))) {
StringBuilder response = new StringBuilder();
String responseLine = null;
while ((responseLine = br.readLine()) != null) {
response.append(responseLine.trim());
}
body = response.toString();
}
return body;
}
}

View File

@ -0,0 +1,169 @@
package eu.dnetlib.dhp.api;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.management.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.api.model.*;
import eu.dnetlib.dhp.bulktag.community.Community;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import eu.dnetlib.dhp.bulktag.community.Provider;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory;
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
/**
* @author miriam.baglioni
* @Date 09/10/23
*/
public class Utils implements Serializable {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final VerbResolver resolver = VerbResolverFactory.newInstance();
private static final Logger log = LoggerFactory.getLogger(Utils.class);
public static CommunityConfiguration getCommunityConfiguration(boolean production) throws IOException {
final Map<String, Community> communities = Maps.newHashMap();
List<Community> validCommunities = new ArrayList<>();
getValidCommunities(production)
.forEach(community -> {
try {
CommunityModel cm = MAPPER
.readValue(QueryCommunityAPI.community(community.getId(), production), CommunityModel.class);
validCommunities.add(getCommunity(cm));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
validCommunities.forEach(community -> {
try {
DatasourceList dl = MAPPER
.readValue(
QueryCommunityAPI.communityDatasource(community.getId(), production), DatasourceList.class);
community.setProviders(dl.stream().map(d -> {
if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
return null;
Provider p = new Provider();
p.setOpenaireId("10|" + d.getOpenaireId());
p.setSelectionConstraints(d.getSelectioncriteria());
if (p.getSelectionConstraints() != null)
p.getSelectionConstraints().setSelection(resolver);
return p;
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
validCommunities.forEach(community -> {
if (community.isValid())
communities.put(community.getId(), community);
});
return new CommunityConfiguration(communities);
}
private static Community getCommunity(CommunityModel cm) {
Community c = new Community();
c.setId(cm.getId());
c.setZenodoCommunities(cm.getOtherZenodoCommunities());
if (!StringUtils.isNullOrEmpty(cm.getZenodoCommunity()))
c.getZenodoCommunities().add(cm.getZenodoCommunity());
c.setSubjects(cm.getSubjects());
c.getSubjects().addAll(cm.getFos());
c.getSubjects().addAll(cm.getSdg());
if (cm.getAdvancedConstraints() != null) {
c.setConstraints(cm.getAdvancedConstraints());
c.getConstraints().setSelection(resolver);
}
if (cm.getRemoveConstraints() != null) {
c.setRemoveConstraints(cm.getRemoveConstraints());
c.getRemoveConstraints().setSelection(resolver);
}
return c;
}
public static List<CommunityModel> getValidCommunities(boolean production) throws IOException {
return MAPPER
.readValue(QueryCommunityAPI.communities(production), CommunitySummary.class)
.stream()
.filter(
community -> !community.getStatus().equals("hidden") &&
(community.getType().equals("ri") || community.getType().equals("community")))
.collect(Collectors.toList());
}
/**
* it returns for each organization the list of associated communities
*/
public static CommunityEntityMap getCommunityOrganization(boolean production) throws IOException {
CommunityEntityMap organizationMap = new CommunityEntityMap();
getValidCommunities(production)
.forEach(community -> {
String id = community.getId();
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.communityPropagationOrganization(id, production), OrganizationList.class);
associatedOrgs.forEach(o -> {
if (!organizationMap
.keySet()
.contains(
"20|" + o))
organizationMap.put("20|" + o, new ArrayList<>());
organizationMap.get("20|" + o).add(community.getId());
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return organizationMap;
}
public static CommunityEntityMap getCommunityProjects(boolean production) throws IOException {
CommunityEntityMap projectMap = new CommunityEntityMap();
getValidCommunities(production)
.forEach(community -> {
int page = -1;
int size = 100;
ContentModel cm = new ContentModel();
do {
page++;
try {
cm = MAPPER
.readValue(
QueryCommunityAPI
.communityProjects(
community.getId(), String.valueOf(page), String.valueOf(size), production),
ContentModel.class);
if (cm.getContent().size() > 0) {
cm.getContent().forEach(p -> {
if (!projectMap.keySet().contains("40|" + p.getOpenaireId()))
projectMap.put("40|" + p.getOpenaireId(), new ArrayList<>());
projectMap.get("40|" + p.getOpenaireId()).add(community.getId());
});
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} while (!cm.getLast());
});
return projectMap;
}
}

View File

@ -0,0 +1,43 @@
package eu.dnetlib.dhp.api.model;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.google.gson.Gson;
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
@JsonAutoDetect
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommunityContentprovider {
private String openaireId;
private SelectionConstraints selectioncriteria;
private String enabled;
public String getEnabled() {
return enabled;
}
public void setEnabled(String enabled) {
this.enabled = enabled;
}
public String getOpenaireId() {
return openaireId;
}
public void setOpenaireId(final String openaireId) {
this.openaireId = openaireId;
}
public SelectionConstraints getSelectioncriteria() {
return this.selectioncriteria;
}
public void setSelectioncriteria(SelectionConstraints selectioncriteria) {
this.selectioncriteria = selectioncriteria;
}
}

View File

@ -0,0 +1,21 @@
package eu.dnetlib.dhp.api.model;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class CommunityEntityMap extends HashMap<String, List<String>> {
public CommunityEntityMap() {
super();
}
public List<String> get(String key) {
if (super.get(key) == null) {
return new ArrayList<>();
}
return super.get(key);
}
}

View File

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommunityModel implements Serializable {
private String id;
private String type;
private String status;
private String zenodoCommunity;
private List<String> subjects;
private List<String> otherZenodoCommunities;
private List<String> fos;
private List<String> sdg;
private SelectionConstraints advancedConstraints;
private SelectionConstraints removeConstraints;
public String getZenodoCommunity() {
return zenodoCommunity;
}
public void setZenodoCommunity(String zenodoCommunity) {
this.zenodoCommunity = zenodoCommunity;
}
public List<String> getSubjects() {
return subjects;
}
public void setSubjects(List<String> subjects) {
this.subjects = subjects;
}
public List<String> getOtherZenodoCommunities() {
return otherZenodoCommunities;
}
public void setOtherZenodoCommunities(List<String> otherZenodoCommunities) {
this.otherZenodoCommunities = otherZenodoCommunities;
}
public List<String> getFos() {
return fos;
}
public void setFos(List<String> fos) {
this.fos = fos;
}
public List<String> getSdg() {
return sdg;
}
public void setSdg(List<String> sdg) {
this.sdg = sdg;
}
public SelectionConstraints getRemoveConstraints() {
return removeConstraints;
}
public void setRemoveConstraints(SelectionConstraints removeConstraints) {
this.removeConstraints = removeConstraints;
}
public SelectionConstraints getAdvancedConstraints() {
return advancedConstraints;
}
public void setAdvancedConstraints(SelectionConstraints advancedConstraints) {
this.advancedConstraints = advancedConstraints;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}

View File

@ -0,0 +1,15 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
public class CommunitySummary extends ArrayList<CommunityModel> implements Serializable {
public CommunitySummary() {
super();
}
}

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author miriam.baglioni
* @Date 09/10/23
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ContentModel implements Serializable {
private List<ProjectModel> content;
private Integer totalPages;
private Boolean last;
private Integer number;
public List<ProjectModel> getContent() {
return content;
}
public void setContent(List<ProjectModel> content) {
this.content = content;
}
public Integer getTotalPages() {
return totalPages;
}
public void setTotalPages(Integer totalPages) {
this.totalPages = totalPages;
}
public Boolean getLast() {
return last;
}
public void setLast(Boolean last) {
this.last = last;
}
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
}

View File

@ -0,0 +1,13 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
import eu.dnetlib.dhp.api.model.CommunityContentprovider;
public class DatasourceList extends ArrayList<CommunityContentprovider> implements Serializable {
public DatasourceList() {
super();
}
}

View File

@ -0,0 +1,16 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
/**
* @author miriam.baglioni
* @Date 09/10/23
*/
public class OrganizationList extends ArrayList<String> implements Serializable {
public OrganizationList() {
super();
}
}

View File

@ -0,0 +1,24 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author miriam.baglioni
* @Date 09/10/23
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ProjectModel implements Serializable {
private String openaireId;
public String getOpenaireId() {
return openaireId;
}
public void setOpenaireId(String openaireId) {
this.openaireId = openaireId;
}
}

View File

@ -9,7 +9,6 @@ import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -21,8 +20,11 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -53,50 +55,38 @@ public class SparkBulkTagJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Boolean isTest = Optional
.ofNullable(parser.get("isTest"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
log.info("isTest: {} ", isTest);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final boolean production = Boolean.valueOf(parser.get("production"));
log.info("production: {}", production);
ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class);
log.info("pathMap: {}", new Gson().toJson(protoMappingParams));
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
CommunityConfiguration cc;
String taggingConf = parser.get("taggingConf");
String taggingConf = Optional
.ofNullable(parser.get("taggingConf"))
.map(String::valueOf)
.orElse(null);
if (isTest) {
if (taggingConf != null) {
cc = CommunityConfigurationFactory.newInstance(taggingConf);
} else {
cc = QueryInformationSystem.getCommunityConfiguration(parser.get("isLookUpUrl"));
cc = Utils.getCommunityConfiguration(production);
}
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
extendCommunityConfigurationForEOSC(spark, inputPath, cc);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc);
});
}
@ -105,10 +95,7 @@ public class SparkBulkTagJob {
Dataset<String> datasources = readPath(
spark, inputPath
.substring(
0,
inputPath.lastIndexOf("/"))
+ "/datasource",
+ "datasource",
Datasource.class)
.filter((FilterFunction<Datasource>) ds -> isOKDatasource(ds))
.map((MapFunction<Datasource, String>) ds -> ds.getId(), Encoders.STRING());
@ -116,10 +103,10 @@ public class SparkBulkTagJob {
Map<String, List<Pair<String, SelectionConstraints>>> dsm = cc.getEoscDatasourceMap();
for (String ds : datasources.collectAsList()) {
final String dsId = ds.substring(3);
if (!dsm.containsKey(dsId)) {
// final String dsId = ds.substring(3);
if (!dsm.containsKey(ds)) {
ArrayList<Pair<String, SelectionConstraints>> eoscList = new ArrayList<>();
dsm.put(dsId, eoscList);
dsm.put(ds, eoscList);
}
}
@ -141,22 +128,30 @@ public class SparkBulkTagJob {
String inputPath,
String outputPath,
ProtoMap protoMappingParams,
Class<R> resultClazz,
CommunityConfiguration communityConfiguration) {
ResultTagger resultTagger = new ResultTagger();
readPath(spark, inputPath, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria(
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(e -> ModelSupport.isResult(e))
.forEach(e -> {
removeOutputDir(spark, outputPath + e.name());
ResultTagger resultTagger = new ResultTagger();
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
readPath(spark, inputPath + e.name(), resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria(
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
});
}
public static <R> Dataset<R> readPath(

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import com.google.gson.Gson;
@ -13,7 +14,7 @@ public class Community implements Serializable {
private String id;
private List<String> subjects = new ArrayList<>();
private List<Provider> providers = new ArrayList<>();
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
private List<String> zenodoCommunities = new ArrayList<>();
private SelectionConstraints constraints = new SelectionConstraints();
private SelectionConstraints removeConstraints = new SelectionConstraints();
@ -26,7 +27,7 @@ public class Community implements Serializable {
return !getSubjects().isEmpty()
|| !getProviders().isEmpty()
|| !getZenodoCommunities().isEmpty()
|| getConstraints().getCriteria() != null;
|| (Optional.ofNullable(getConstraints()).isPresent() && getConstraints().getCriteria() != null);
}
public String getId() {
@ -53,11 +54,11 @@ public class Community implements Serializable {
this.providers = providers;
}
public List<ZenodoCommunity> getZenodoCommunities() {
public List<String> getZenodoCommunities() {
return zenodoCommunities;
}
public void setZenodoCommunities(List<ZenodoCommunity> zenodoCommunities) {
public void setZenodoCommunities(List<String> zenodoCommunities) {
this.zenodoCommunities = zenodoCommunities;
}

View File

@ -81,7 +81,7 @@ public class CommunityConfiguration implements Serializable {
this.removeConstraintsMap = removeConstraintsMap;
}
CommunityConfiguration(final Map<String, Community> communities) {
public CommunityConfiguration(final Map<String, Community> communities) {
this.communities = communities;
init();
}
@ -117,10 +117,10 @@ public class CommunityConfiguration implements Serializable {
add(d.getOpenaireId(), new Pair<>(id, d.getSelectionConstraints()), datasourceMap);
}
// get zenodo communities
for (ZenodoCommunity zc : c.getZenodoCommunities()) {
for (String zc : c.getZenodoCommunities()) {
add(
zc.getZenodoCommunityId(),
new Pair<>(id, zc.getSelCriteria()),
zc,
new Pair<>(id, null),
zenodocommunityMap);
}
selectionConstraintsMap.put(id, c.getConstraints());

View File

@ -143,16 +143,16 @@ public class CommunityConfigurationFactory {
return providerList;
}
private static List<ZenodoCommunity> parseZenodoCommunities(final Node node) {
private static List<String> parseZenodoCommunities(final Node node) {
final List<Node> list = node.selectNodes("./zenodocommunities/zenodocommunity");
final List<ZenodoCommunity> zenodoCommunityList = new ArrayList<>();
final List<String> zenodoCommunityList = new ArrayList<>();
for (Node n : list) {
ZenodoCommunity zc = new ZenodoCommunity();
zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText());
zc.setSelCriteria(n.selectSingleNode("./selcriteria"));
// ZenodoCommunity zc = new ZenodoCommunity();
// zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText());
// zc.setSelCriteria(n.selectSingleNode("./selcriteria"));
zenodoCommunityList.add(zc);
zenodoCommunityList.add(n.selectSingleNode("./zenodoid").getText());
}
log.info("size of the zenodo community list " + zenodoCommunityList.size());

View File

@ -4,6 +4,8 @@ package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
import eu.dnetlib.dhp.bulktag.criteria.Selection;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
@ -11,7 +13,8 @@ public class Constraint implements Serializable {
private String verb;
private String field;
private String value;
// private String element;
@JsonIgnore
private Selection selection;
public String getVerb() {
@ -38,10 +41,7 @@ public class Constraint implements Serializable {
this.value = value;
}
public void setSelection(Selection sel) {
selection = sel;
}
@JsonIgnore
public void setSelection(VerbResolver resolver)
throws InvocationTargetException, NoSuchMethodException, InstantiationException,
IllegalAccessException {
@ -52,11 +52,4 @@ public class Constraint implements Serializable {
return selection.apply(metadata);
}
// public String getElement() {
// return element;
// }
//
// public void setElement(String element) {
// this.element = element;
// }
}

View File

@ -1,34 +0,0 @@
package eu.dnetlib.dhp.bulktag.community;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.dom4j.DocumentException;
import org.xml.sax.SAXException;
import com.google.common.base.Joiner;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class QueryInformationSystem {
public static CommunityConfiguration getCommunityConfiguration(final String isLookupUrl)
throws ISLookUpException, DocumentException, SAXException, IOException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
final List<String> res = isLookUp
.quickSearchProfile(
IOUtils
.toString(
QueryInformationSystem.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/query.xq")));
final String xmlConf = "<communities>" + Joiner.on(" ").join(res) + "</communities>";
return CommunityConfigurationFactory.newInstance(xmlConf);
}
}

View File

@ -85,16 +85,18 @@ public class ResultTagger implements Serializable {
conf
.getRemoveConstraintsMap()
.keySet()
.forEach(communityId -> {
if (conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
conf
.getRemoveConstraintsMap()
.get(communityId)
.getCriteria()
.stream()
.anyMatch(crit -> crit.verifyCriteria(param)))
removeCommunities.add(communityId);
});
.forEach(
communityId -> {
if (conf.getRemoveConstraintsMap().keySet().contains(communityId) &&
conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
conf
.getRemoveConstraintsMap()
.get(communityId)
.getCriteria()
.stream()
.anyMatch(crit -> crit.verifyCriteria(param)))
removeCommunities.add(communityId);
});
// communities contains all the communities to be added as context for the result
final Set<String> communities = new HashSet<>();
@ -124,10 +126,10 @@ public class ResultTagger implements Serializable {
if (Objects.nonNull(result.getInstance())) {
for (Instance i : result.getInstance()) {
if (Objects.nonNull(i.getCollectedfrom()) && Objects.nonNull(i.getCollectedfrom().getKey())) {
collfrom.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|"));
collfrom.add(i.getCollectedfrom().getKey());
}
if (Objects.nonNull(i.getHostedby()) && Objects.nonNull(i.getHostedby().getKey())) {
hostdby.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|"));
hostdby.add(i.getHostedby().getKey());
}
}

View File

@ -7,11 +7,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
@JsonAutoDetect
public class SelectionConstraints implements Serializable {
private List<Constraints> criteria;

View File

@ -1,10 +1,5 @@
[
{
"paramName":"is",
"paramLongName":"isLookUpUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName":"s",
"paramLongName":"sourcePath",
@ -17,12 +12,7 @@
"paramDescription": "the json path associated to each selection field",
"paramRequired": true
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
@ -35,17 +25,19 @@
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "test",
"paramLongName": "isTest",
"paramDescription": "Parameter intended for testing purposes only. True if the reun is relatesd to a test and so the taggingConf parameter should be loaded",
"paramRequired": false
},
{
"paramName": "tg",
"paramLongName": "taggingConf",
"paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed",
"paramRequired": false
},
{
"paramName": "p",
"paramLongName": "production",
"paramDescription": "this parameter is intended for testing purposes only. It is a possible tagging configuration obtained via the XQUERY. Intended to be removed",
"paramRequired": true
}
]

View File

@ -45,10 +45,14 @@
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
<value>5G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
<name>memoryOverhead</name>
<value>3g</value>
</property>
<property>
<name>partitions</name>
<value>3284</value>
</property>
</configuration>

View File

@ -4,10 +4,6 @@
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the isLookup service endpoint</description>
</property>
<property>
<name>pathMap</name>
<description>the json path associated to each selection field</description>
@ -102,16 +98,9 @@
<error to="Kill"/>
</action>
<join name="copy_wait" to="fork_exec_bulktag"/>
<join name="copy_wait" to="exec_bulktag"/>
<fork name="fork_exec_bulktag">
<path start="bulktag_publication"/>
<path start="bulktag_dataset"/>
<path start="bulktag_otherresearchproduct"/>
<path start="bulktag_software"/>
</fork>
<action name="bulktag_publication">
<action name="exec_bulktag">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
@ -122,104 +111,23 @@
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--conf spark.executor.memoryOverhead=${memoryOverhead}
--conf spark.sql.shuffle.partitions=${partitions}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${outputPath}/</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--production</arg><arg>${production}</arg>
</spark>
<ok to="wait"/>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="bulktag_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-dataset</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="bulktag_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-orp</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="bulktag_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-software</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="End"/>
<end name="End"/>

View File

@ -0,0 +1,115 @@
package eu.dnetlib.dhp.api;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.api.model.CommunityModel;
import eu.dnetlib.dhp.api.model.CommunitySummary;
import eu.dnetlib.dhp.api.model.DatasourceList;
import eu.dnetlib.dhp.bulktag.community.Community;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
public class QueryCommunityAPITest {
@Test
void communityList() throws Exception {
String body = QueryCommunityAPI.communities(true);
new ObjectMapper()
.readValue(body, CommunitySummary.class)
.forEach(p -> {
try {
System.out.println(new ObjectMapper().writeValueAsString(p));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
}
@Test
void community() throws Exception {
String id = "dh-ch";
String body = QueryCommunityAPI.community(id, true);
System.out
.println(
new ObjectMapper()
.writeValueAsString(
new ObjectMapper()
.readValue(body, CommunityModel.class)));
}
@Test
void communityDatasource() throws Exception {
String id = "dh-ch";
String body = QueryCommunityAPI.communityDatasource(id, true);
new ObjectMapper()
.readValue(body, DatasourceList.class)
.forEach(ds -> {
try {
System.out.println(new ObjectMapper().writeValueAsString(ds));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
;
}
@Test
void validCommunities() throws Exception {
CommunityConfiguration cc = Utils.getCommunityConfiguration(true);
System.out.println(cc.getCommunities().keySet());
Community community = cc.getCommunities().get("aurora");
Assertions.assertEquals(0, community.getSubjects().size());
Assertions.assertEquals(null, community.getConstraints());
Assertions.assertEquals(null, community.getRemoveConstraints());
Assertions.assertEquals(2, community.getZenodoCommunities().size());
Assertions
.assertTrue(
community.getZenodoCommunities().stream().anyMatch(c -> c.equals("aurora-universities-network")));
Assertions
.assertTrue(community.getZenodoCommunities().stream().anyMatch(c -> c.equals("university-of-innsbruck")));
Assertions.assertEquals(35, community.getProviders().size());
Assertions
.assertEquals(
35, community.getProviders().stream().filter(p -> p.getSelectionConstraints() == null).count());
}
@Test
void eutopiaCommunityConfiguration() throws Exception {
CommunityConfiguration cc = Utils.getCommunityConfiguration(true);
System.out.println(cc.getCommunities().keySet());
Community community = cc.getCommunities().get("eutopia");
community.getProviders().forEach(p -> System.out.println(p.getOpenaireId()));
}
@Test
void getCommunityProjects() throws Exception {
CommunityEntityMap projectMap = Utils.getCommunityProjects(true);
Assertions
.assertTrue(
projectMap
.keySet()
.stream()
.allMatch(k -> k.startsWith("40|")));
System.out.println(projectMap);
}
@Test
void getCommunityOrganizations() throws Exception {
CommunityEntityMap organizationMap = Utils.getCommunityOrganization(true);
Assertions.assertTrue(organizationMap.keySet().stream().allMatch(k -> k.startsWith("20|")));
}
}

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.FileUtils;
@ -98,14 +99,11 @@ public class BulkTagJobTest {
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates").getPath(),
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(),
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -133,19 +131,16 @@ public class BulkTagJobTest {
@Test
void bulktagBySubjectNoPreviousContextTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext")
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext/")
.getPath();
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -230,19 +225,19 @@ public class BulkTagJobTest {
void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance")
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/")
.getPath();
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -311,18 +306,18 @@ public class BulkTagJobTest {
@Test
void bulktagByDatasourceTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource")
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/publication",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -384,25 +379,25 @@ public class BulkTagJobTest {
void bulktagByZenodoCommunityTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity")
"/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/orp",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<OtherResearchProduct> tmp = sc
.textFile(workingDir.toString() + "/orp")
.textFile(workingDir.toString() + "/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
Assertions.assertEquals(10, tmp.count());
@ -505,18 +500,18 @@ public class BulkTagJobTest {
@Test
void bulktagBySubjectDatasourceTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource")
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -636,14 +631,14 @@ public class BulkTagJobTest {
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/software_10.json.gz").getPath(),
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/").getPath(),
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -732,18 +727,18 @@ public class BulkTagJobTest {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints")
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -774,19 +769,19 @@ public class BulkTagJobTest {
void bulkTagOtherJupyter() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct")
"/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -829,18 +824,18 @@ public class BulkTagJobTest {
public void bulkTagDatasetJupyter() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/dataset")
"/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -878,18 +873,18 @@ public class BulkTagJobTest {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/jupyter/software")
"/eu/dnetlib/dhp/eosctag/jupyter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1096,18 +1091,18 @@ public class BulkTagJobTest {
void galaxyOtherTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/galaxy/otherresearchproduct")
"/eu/dnetlib/dhp/eosctag/galaxy/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1214,18 +1209,18 @@ public class BulkTagJobTest {
void galaxySoftwareTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/galaxy/software")
"/eu/dnetlib/dhp/eosctag/galaxy/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1333,19 +1328,19 @@ public class BulkTagJobTest {
void twitterDatasetTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/twitter/dataset")
"/eu/dnetlib/dhp/eosctag/twitter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1373,19 +1368,19 @@ public class BulkTagJobTest {
void twitterOtherTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/twitter/otherresearchproduct")
"/eu/dnetlib/dhp/eosctag/twitter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/otherresearchproduct",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1418,19 +1413,19 @@ public class BulkTagJobTest {
void twitterSoftwareTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/eosctag/twitter/software")
"/eu/dnetlib/dhp/eosctag/twitter/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1455,19 +1450,19 @@ public class BulkTagJobTest {
void EoscContextTagTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json")
"/eu/dnetlib/dhp/bulktag/eosc/dataset/")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
@ -1533,16 +1528,16 @@ public class BulkTagJobTest {
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints")
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints/")
.getPath(),
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1568,4 +1563,42 @@ public class BulkTagJobTest {
}
@Test
void newConfTest() throws Exception {
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(),
"-taggingConf", taggingConf,
"-outputPath", workingDir.toString() + "/",
"-production", Boolean.TRUE.toString(),
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
Assertions.assertEquals(0, spark.sql(query).count());
}
}

View File

@ -47,7 +47,7 @@ class CommunityConfigurationFactoryTest {
sc.setVerb("not_contains");
sc.setField("contributor");
sc.setValue("DARIAH");
sc.setSelection(resolver.getSelectionCriteria(sc.getVerb(), sc.getValue()));
sc.setSelection(resolver);// .getSelectionCriteria(sc.getVerb(), sc.getValue()));
String metadata = "This work has been partially supported by DARIAH-EU infrastructure";
Assertions.assertFalse(sc.verifyCriteria(metadata));
}