forked from D-Net/dnet-hadoop
changes to use the API instead of the IS the get the information for the communities to be used during bulktagging and context propagation
This commit is contained in:
parent
f344ad76d0
commit
3d6be20989
|
@ -95,7 +95,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
|
||||
return projectScores.map((MapFunction<BipProjectModel, Project>) bipProjectScores -> {
|
||||
Project project = new Project();
|
||||
project.setId(bipProjectScores.getProjectId());
|
||||
// project.setId(bipProjectScores.getProjectId());
|
||||
project.setMeasures(bipProjectScores.toMeasures());
|
||||
return project;
|
||||
}, Encoders.bean(Project.class))
|
||||
|
|
|
@ -67,60 +67,60 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
|||
log.info("graphOutputPath: '{}'", graphOutputPath);
|
||||
|
||||
Dataset<Relation> mergeRels = spark
|
||||
.read()
|
||||
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
|
||||
.as(REL_BEAN_ENC);
|
||||
.read()
|
||||
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
|
||||
.as(REL_BEAN_ENC);
|
||||
|
||||
// <mergedObjectID, dedupID>
|
||||
Dataset<Row> idsToMerge = mergeRels
|
||||
.where(col("relClass").equalTo(ModelConstants.MERGES))
|
||||
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
|
||||
.distinct();
|
||||
.where(col("relClass").equalTo(ModelConstants.MERGES))
|
||||
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
|
||||
.distinct();
|
||||
|
||||
Dataset<Row> allRels = spark
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(graphBasePath + "/relation");
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(graphBasePath + "/relation");
|
||||
|
||||
Dataset<Relation> dedupedRels = allRels
|
||||
.joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
|
||||
.joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
|
||||
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
|
||||
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
|
||||
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
|
||||
Relation rel = t._1();
|
||||
String newSource = t._2();
|
||||
String newTarget = t._3();
|
||||
.joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
|
||||
.joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
|
||||
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
|
||||
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
|
||||
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
|
||||
Relation rel = t._1();
|
||||
String newSource = t._2();
|
||||
String newTarget = t._3();
|
||||
|
||||
if (rel.getDataInfo() == null) {
|
||||
rel.setDataInfo(new DataInfo());
|
||||
}
|
||||
if (rel.getDataInfo() == null) {
|
||||
rel.setDataInfo(new DataInfo());
|
||||
}
|
||||
|
||||
if (newSource != null || newTarget != null) {
|
||||
rel.getDataInfo().setDeletedbyinference(false);
|
||||
if (newSource != null || newTarget != null) {
|
||||
rel.getDataInfo().setDeletedbyinference(false);
|
||||
|
||||
if (newSource != null)
|
||||
rel.setSource(newSource);
|
||||
if (newSource != null)
|
||||
rel.setSource(newSource);
|
||||
|
||||
if (newTarget != null)
|
||||
rel.setTarget(newTarget);
|
||||
}
|
||||
if (newTarget != null)
|
||||
rel.setTarget(newTarget);
|
||||
}
|
||||
|
||||
return rel;
|
||||
}, REL_BEAN_ENC);
|
||||
return rel;
|
||||
}, REL_BEAN_ENC);
|
||||
|
||||
// ids of records that are both not deletedbyinference and not invisible
|
||||
Dataset<Row> ids = validIds(spark, graphBasePath);
|
||||
|
||||
// filter relations that point to valid records, can force them to be visible
|
||||
Dataset<Relation> cleanedRels = dedupedRels
|
||||
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
|
||||
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
|
||||
.as(REL_BEAN_ENC)
|
||||
.map((MapFunction<Relation, Relation>) r -> {
|
||||
r.getDataInfo().setInvisible(false);
|
||||
return r;
|
||||
}, REL_KRYO_ENC);
|
||||
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
|
||||
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
|
||||
.as(REL_BEAN_ENC)
|
||||
.map((MapFunction<Relation, Relation>) r -> {
|
||||
r.getDataInfo().setInvisible(false);
|
||||
return r;
|
||||
}, REL_KRYO_ENC);
|
||||
|
||||
Dataset<Relation> distinctRels = cleanedRels
|
||||
.groupByKey(
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
|
||||
package eu.dnetlib.dhp.api;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
|
||||
import org.apache.http.HttpHeaders;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 06/10/23
|
||||
*/
|
||||
public class QueryCommunityAPI {
|
||||
private static final String baseUrl = "https://services.openaire.eu/openaire/";
|
||||
|
||||
private static String get(String geturl) throws IOException{
|
||||
URL url = new URL(geturl);
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setDoOutput(true);
|
||||
conn.setRequestMethod("GET");
|
||||
|
||||
int responseCode = conn.getResponseCode();
|
||||
String body = getBody(conn);
|
||||
conn.disconnect();
|
||||
if (responseCode != HttpURLConnection.HTTP_OK)
|
||||
throw new IOException("Unexpected code " + responseCode + body);
|
||||
|
||||
return body;
|
||||
}
|
||||
|
||||
public static String communities() throws IOException {
|
||||
return get(baseUrl + "community/communities");
|
||||
}
|
||||
|
||||
public static String community(String id) throws IOException {
|
||||
return get(baseUrl + "community/" + id);
|
||||
}
|
||||
|
||||
public static String communityDatasource(String id)throws IOException{
|
||||
return get(baseUrl + "community/" + id + "/contentproviders");
|
||||
|
||||
}
|
||||
|
||||
public static String communityPropagationOrganization(String id) throws IOException {
|
||||
return get(baseUrl + "community/" + id + "/propagationOrganizations");
|
||||
}
|
||||
|
||||
public static String communityProjects(String id, String page, String size) throws IOException{
|
||||
return get(baseUrl + "community/" + id +"/projects/" + page + "/" + size);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static String getBody(HttpURLConnection conn) throws IOException {
|
||||
String body = "{}";
|
||||
try (BufferedReader br = new BufferedReader(
|
||||
new InputStreamReader(conn.getInputStream(), "utf-8"))) {
|
||||
StringBuilder response = new StringBuilder();
|
||||
String responseLine = null;
|
||||
while ((responseLine = br.readLine()) != null) {
|
||||
response.append(responseLine.trim());
|
||||
}
|
||||
|
||||
body = response.toString();
|
||||
|
||||
}
|
||||
return body;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
package eu.dnetlib.dhp.api;
|
||||
|
||||
import com.amazonaws.util.StringUtils;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Maps;
|
||||
import eu.dnetlib.dhp.api.model.*;
|
||||
import eu.dnetlib.dhp.bulktag.community.Community;
|
||||
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
|
||||
import eu.dnetlib.dhp.bulktag.community.Provider;
|
||||
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
|
||||
import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory;
|
||||
|
||||
import javax.management.Query;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 09/10/23
|
||||
*/
|
||||
public class Utils implements Serializable {
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
private static final VerbResolver resolver = VerbResolverFactory.newInstance();
|
||||
|
||||
public static CommunityConfiguration getCommunityConfiguration() throws IOException {
|
||||
final Map<String, Community> communities = Maps.newHashMap();
|
||||
List<Community> validCommunities = new ArrayList<>();
|
||||
getValidCommunities()
|
||||
.forEach(community -> {
|
||||
try {
|
||||
CommunityModel cm = MAPPER.readValue(QueryCommunityAPI.community(community.getId()), CommunityModel.class);
|
||||
validCommunities.add(getCommunity(cm));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
validCommunities.forEach(community ->{
|
||||
try {
|
||||
DatasourceList dl = MAPPER.readValue(QueryCommunityAPI.communityDatasource(community.getId()), DatasourceList.class);
|
||||
community.setProviders(dl.stream().map(d -> {
|
||||
// if(d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
|
||||
// return null;
|
||||
Provider p = new Provider();
|
||||
p.setOpenaireId("10|" + d.getOpenaireId());
|
||||
p.setSelectionConstraints(d.getSelectioncriteria());
|
||||
if(p.getSelectionConstraints() != null)
|
||||
p.getSelectionConstraints().setSelection(resolver);
|
||||
return p;
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList()));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
validCommunities.forEach(community ->{
|
||||
if(community.isValid())
|
||||
communities.put(community.getId(), community);
|
||||
});
|
||||
return new CommunityConfiguration(communities);
|
||||
}
|
||||
|
||||
private static Community getCommunity(CommunityModel cm){
|
||||
Community c = new Community();
|
||||
c.setId(cm.getId());
|
||||
c.setZenodoCommunities(cm.getOtherZenodoCommunities());
|
||||
if(!StringUtils.isNullOrEmpty(cm.getZenodoCommunity()))
|
||||
c.getZenodoCommunities().add(cm.getZenodoCommunity());
|
||||
c.setSubjects(cm.getSubjects());
|
||||
c.getSubjects().addAll(cm.getFos());
|
||||
c.getSubjects().addAll(cm.getSdg());
|
||||
c.setConstraints(cm.getAdvancedConstraints());
|
||||
if(c.getConstraints()!=null)
|
||||
c.getConstraints().setSelection(resolver);
|
||||
c.setRemoveConstraints(cm.getRemoveConstraints());
|
||||
if(c.getRemoveConstraints()!=null)
|
||||
c.getRemoveConstraints().setSelection(resolver);
|
||||
return c;
|
||||
}
|
||||
|
||||
public static List<CommunityModel> getValidCommunities() throws IOException {
|
||||
return MAPPER.readValue(QueryCommunityAPI.communities(), CommunitySummary.class)
|
||||
.stream()
|
||||
.filter(community -> !community.getStatus().equals("hidden") &&
|
||||
(community.getType().equals("ri") || community.getType().equals("community")))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
public static CommunityEntityMap getCommunityOrganization() throws IOException {
|
||||
CommunityEntityMap organizationMap = new CommunityEntityMap();
|
||||
getValidCommunities()
|
||||
.forEach(community -> {
|
||||
String id = community.getId();
|
||||
try {
|
||||
List<String> associatedOrgs = MAPPER.readValue(QueryCommunityAPI.communityPropagationOrganization(id), OrganizationList.class);
|
||||
if(associatedOrgs.size() >0){
|
||||
organizationMap.put(id, associatedOrgs);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
return organizationMap;
|
||||
}
|
||||
|
||||
public static CommunityEntityMap getCommunityProjects()throws IOException{
|
||||
CommunityEntityMap projectMap = new CommunityEntityMap();
|
||||
getValidCommunities()
|
||||
.forEach(community ->{
|
||||
int page = -1;
|
||||
int size = 100;
|
||||
ContentModel cm = new ContentModel();
|
||||
List<String> projectList = new ArrayList<>();
|
||||
do{
|
||||
page ++;
|
||||
try {
|
||||
cm = MAPPER.readValue( QueryCommunityAPI.communityProjects(community.getId(), String.valueOf(page), String.valueOf(size)), ContentModel.class);
|
||||
if (cm.getContent().size() > 0){
|
||||
|
||||
cm.getContent().forEach(p ->
|
||||
projectList.add ("40|" + p.getOpenaireId()));
|
||||
projectMap.put(community.getId(), projectList);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}while (!cm.getLast());
|
||||
});
|
||||
return projectMap;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.google.gson.Gson;
|
||||
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
|
||||
|
||||
|
||||
@JsonAutoDetect
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class CommunityContentprovider {
|
||||
private String openaireId;
|
||||
private SelectionConstraints selectioncriteria;
|
||||
|
||||
private String enabled;
|
||||
|
||||
public String getEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(String enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public String getOpenaireId() {
|
||||
return openaireId;
|
||||
}
|
||||
|
||||
public void setOpenaireId(final String openaireId) {
|
||||
this.openaireId = openaireId;
|
||||
}
|
||||
|
||||
|
||||
public SelectionConstraints getSelectioncriteria() {
|
||||
|
||||
return this.selectioncriteria;
|
||||
}
|
||||
|
||||
public void setSelectioncriteria(SelectionConstraints selectioncriteria) {
|
||||
this.selectioncriteria = selectioncriteria;
|
||||
|
||||
}
|
||||
}
|
|
@ -1,13 +1,13 @@
|
|||
|
||||
package eu.dnetlib.dhp.resulttocommunityfromorganization;
|
||||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
public class OrganizationMap extends HashMap<String, List<String>> {
|
||||
public class CommunityEntityMap extends HashMap<String, List<String>> {
|
||||
|
||||
public OrganizationMap() {
|
||||
public CommunityEntityMap() {
|
||||
super();
|
||||
}
|
||||
|
|
@ -0,0 +1,108 @@
|
|||
|
||||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
|
||||
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 06/10/23
|
||||
*/
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class CommunityModel implements Serializable {
|
||||
private String id;
|
||||
private String type;
|
||||
private String status;
|
||||
|
||||
private String zenodoCommunity;
|
||||
private List<String> subjects;
|
||||
private List<String> otherZenodoCommunities;
|
||||
private List<String> fos;
|
||||
private List<String> sdg;
|
||||
private SelectionConstraints advancedConstraints;
|
||||
private SelectionConstraints removeConstraints;
|
||||
|
||||
public String getZenodoCommunity() {
|
||||
return zenodoCommunity;
|
||||
}
|
||||
|
||||
public void setZenodoCommunity(String zenodoCommunity) {
|
||||
this.zenodoCommunity = zenodoCommunity;
|
||||
}
|
||||
|
||||
public List<String> getSubjects() {
|
||||
return subjects;
|
||||
}
|
||||
|
||||
public void setSubjects(List<String> subjects) {
|
||||
this.subjects = subjects;
|
||||
}
|
||||
|
||||
public List<String> getOtherZenodoCommunities() {
|
||||
return otherZenodoCommunities;
|
||||
}
|
||||
|
||||
public void setOtherZenodoCommunities(List<String> otherZenodoCommunities) {
|
||||
this.otherZenodoCommunities = otherZenodoCommunities;
|
||||
}
|
||||
|
||||
public List<String> getFos() {
|
||||
return fos;
|
||||
}
|
||||
|
||||
public void setFos(List<String> fos) {
|
||||
this.fos = fos;
|
||||
}
|
||||
|
||||
public List<String> getSdg() {
|
||||
return sdg;
|
||||
}
|
||||
|
||||
public void setSdg(List<String> sdg) {
|
||||
this.sdg = sdg;
|
||||
}
|
||||
|
||||
public SelectionConstraints getRemoveConstraints() {
|
||||
return removeConstraints;
|
||||
}
|
||||
|
||||
public void setRemoveConstraints(SelectionConstraints removeConstraints) {
|
||||
this.removeConstraints = removeConstraints;
|
||||
}
|
||||
|
||||
public SelectionConstraints getAdvancedConstraints() {
|
||||
return advancedConstraints;
|
||||
}
|
||||
|
||||
public void setAdvancedConstraints(SelectionConstraints advancedConstraints) {
|
||||
this.advancedConstraints = advancedConstraints;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 06/10/23
|
||||
*/
|
||||
public class CommunitySummary extends ArrayList<CommunityModel> implements Serializable {
|
||||
public CommunitySummary() {
|
||||
super();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 09/10/23
|
||||
*/
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class ContentModel implements Serializable {
|
||||
private List<ProjectModel> content;
|
||||
private Integer totalPages;
|
||||
private Boolean last;
|
||||
private Integer number;
|
||||
|
||||
public List<ProjectModel> getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
public void setContent(List<ProjectModel> content) {
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
public Integer getTotalPages() {
|
||||
return totalPages;
|
||||
}
|
||||
|
||||
public void setTotalPages(Integer totalPages) {
|
||||
this.totalPages = totalPages;
|
||||
}
|
||||
|
||||
public Boolean getLast() {
|
||||
return last;
|
||||
}
|
||||
|
||||
public void setLast(Boolean last) {
|
||||
this.last = last;
|
||||
}
|
||||
|
||||
public Integer getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
public void setNumber(Integer number) {
|
||||
this.number = number;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import eu.dnetlib.dhp.api.model.CommunityContentprovider;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
public class DatasourceList extends ArrayList<CommunityContentprovider> implements Serializable {
|
||||
public DatasourceList(){
|
||||
super();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 09/10/23
|
||||
*/
|
||||
public class OrganizationList extends ArrayList<String> implements Serializable {
|
||||
|
||||
public OrganizationList(){
|
||||
super();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 09/10/23
|
||||
*/
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class ProjectModel implements Serializable {
|
||||
|
||||
private String openaireId;
|
||||
|
||||
public String getOpenaireId() {
|
||||
return openaireId;
|
||||
}
|
||||
|
||||
public void setOpenaireId(String openaireId) {
|
||||
this.openaireId = openaireId;
|
||||
}
|
||||
}
|
|
@ -6,10 +6,10 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|||
|
||||
import java.util.*;
|
||||
|
||||
import eu.dnetlib.dhp.api.Utils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
|
@ -87,7 +87,7 @@ public class SparkBulkTagJob {
|
|||
if (isTest) {
|
||||
cc = CommunityConfigurationFactory.newInstance(taggingConf);
|
||||
} else {
|
||||
cc = QueryInformationSystem.getCommunityConfiguration(parser.get("isLookUpUrl"));
|
||||
cc = Utils.getCommunityConfiguration();//QueryInformationSystem.getCommunityConfiguration(parser.get("isLookUpUrl"));
|
||||
}
|
||||
|
||||
runWithSparkSession(
|
||||
|
|
|
@ -4,16 +4,18 @@ package eu.dnetlib.dhp.bulktag.community;
|
|||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
|
||||
/** Created by miriam on 01/08/2018. */
|
||||
public class Community implements Serializable {
|
||||
|
||||
private String id;
|
||||
private List<String> subjects = new ArrayList<>();
|
||||
private List<Provider> providers = new ArrayList<>();
|
||||
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
|
||||
private List<String> zenodoCommunities = new ArrayList<>();
|
||||
private SelectionConstraints constraints = new SelectionConstraints();
|
||||
private SelectionConstraints removeConstraints = new SelectionConstraints();
|
||||
|
||||
|
@ -26,7 +28,7 @@ public class Community implements Serializable {
|
|||
return !getSubjects().isEmpty()
|
||||
|| !getProviders().isEmpty()
|
||||
|| !getZenodoCommunities().isEmpty()
|
||||
|| getConstraints().getCriteria() != null;
|
||||
|| (Optional.ofNullable(getConstraints()).isPresent() && getConstraints().getCriteria() != null);
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
@ -53,11 +55,11 @@ public class Community implements Serializable {
|
|||
this.providers = providers;
|
||||
}
|
||||
|
||||
public List<ZenodoCommunity> getZenodoCommunities() {
|
||||
public List<String> getZenodoCommunities() {
|
||||
return zenodoCommunities;
|
||||
}
|
||||
|
||||
public void setZenodoCommunities(List<ZenodoCommunity> zenodoCommunities) {
|
||||
public void setZenodoCommunities(List<String> zenodoCommunities) {
|
||||
this.zenodoCommunities = zenodoCommunities;
|
||||
}
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ public class CommunityConfiguration implements Serializable {
|
|||
this.removeConstraintsMap = removeConstraintsMap;
|
||||
}
|
||||
|
||||
CommunityConfiguration(final Map<String, Community> communities) {
|
||||
public CommunityConfiguration(final Map<String, Community> communities) {
|
||||
this.communities = communities;
|
||||
init();
|
||||
}
|
||||
|
@ -117,10 +117,10 @@ public class CommunityConfiguration implements Serializable {
|
|||
add(d.getOpenaireId(), new Pair<>(id, d.getSelectionConstraints()), datasourceMap);
|
||||
}
|
||||
// get zenodo communities
|
||||
for (ZenodoCommunity zc : c.getZenodoCommunities()) {
|
||||
for (String zc : c.getZenodoCommunities()) {
|
||||
add(
|
||||
zc.getZenodoCommunityId(),
|
||||
new Pair<>(id, zc.getSelCriteria()),
|
||||
zc,
|
||||
new Pair<>(id, null),
|
||||
zenodocommunityMap);
|
||||
}
|
||||
selectionConstraintsMap.put(id, c.getConstraints());
|
||||
|
|
|
@ -5,7 +5,6 @@ import java.io.StringReader;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -143,16 +142,16 @@ public class CommunityConfigurationFactory {
|
|||
return providerList;
|
||||
}
|
||||
|
||||
private static List<ZenodoCommunity> parseZenodoCommunities(final Node node) {
|
||||
private static List<String> parseZenodoCommunities(final Node node) {
|
||||
|
||||
final List<Node> list = node.selectNodes("./zenodocommunities/zenodocommunity");
|
||||
final List<ZenodoCommunity> zenodoCommunityList = new ArrayList<>();
|
||||
final List<String> zenodoCommunityList = new ArrayList<>();
|
||||
for (Node n : list) {
|
||||
ZenodoCommunity zc = new ZenodoCommunity();
|
||||
zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText());
|
||||
zc.setSelCriteria(n.selectSingleNode("./selcriteria"));
|
||||
// ZenodoCommunity zc = new ZenodoCommunity();
|
||||
// zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText());
|
||||
// zc.setSelCriteria(n.selectSingleNode("./selcriteria"));
|
||||
|
||||
zenodoCommunityList.add(zc);
|
||||
zenodoCommunityList.add(n.selectSingleNode("./zenodoid").getText());
|
||||
}
|
||||
|
||||
log.info("size of the zenodo community list " + zenodoCommunityList.size());
|
||||
|
|
|
@ -6,12 +6,14 @@ import java.lang.reflect.InvocationTargetException;
|
|||
|
||||
import eu.dnetlib.dhp.bulktag.criteria.Selection;
|
||||
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
|
||||
import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
public class Constraint implements Serializable {
|
||||
private String verb;
|
||||
private String field;
|
||||
private String value;
|
||||
// private String element;
|
||||
@JsonIgnore
|
||||
private Selection selection;
|
||||
|
||||
public String getVerb() {
|
||||
|
@ -37,11 +39,11 @@ public class Constraint implements Serializable {
|
|||
public void setValue(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public void setSelection(Selection sel) {
|
||||
selection = sel;
|
||||
}
|
||||
|
||||
//@JsonIgnore
|
||||
//public void setSelection(Selection sel) {
|
||||
// selection = sel;
|
||||
// }
|
||||
@JsonIgnore
|
||||
public void setSelection(VerbResolver resolver)
|
||||
throws InvocationTargetException, NoSuchMethodException, InstantiationException,
|
||||
IllegalAccessException {
|
||||
|
@ -52,11 +54,5 @@ public class Constraint implements Serializable {
|
|||
return selection.apply(metadata);
|
||||
}
|
||||
|
||||
// public String getElement() {
|
||||
// return element;
|
||||
// }
|
||||
//
|
||||
// public void setElement(String element) {
|
||||
// this.element = element;
|
||||
// }
|
||||
|
||||
}
|
||||
|
|
|
@ -7,11 +7,12 @@ import java.util.Collection;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
|
||||
|
||||
@JsonAutoDetect
|
||||
public class SelectionConstraints implements Serializable {
|
||||
private List<Constraints> criteria;
|
||||
|
||||
|
|
|
@ -6,12 +6,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
|||
|
||||
import java.util.*;
|
||||
|
||||
import eu.dnetlib.dhp.api.Utils;
|
||||
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
@ -48,10 +48,11 @@ public class PrepareResultCommunitySet {
|
|||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final OrganizationMap organizationMap = new Gson()
|
||||
.fromJson(
|
||||
parser.get("organizationtoresultcommunitymap"),
|
||||
OrganizationMap.class);
|
||||
// final CommunityEntityMap organizationMap = new Gson()
|
||||
// .fromJson(
|
||||
// parser.get("organizationtoresultcommunitymap"),
|
||||
// CommunityEntityMap.class);
|
||||
final CommunityEntityMap organizationMap = Utils.getCommunityOrganization();
|
||||
log.info("organizationMap: {}", new Gson().toJson(organizationMap));
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
@ -70,7 +71,7 @@ public class PrepareResultCommunitySet {
|
|||
SparkSession spark,
|
||||
String inputPath,
|
||||
String outputPath,
|
||||
OrganizationMap organizationMap) {
|
||||
CommunityEntityMap organizationMap) {
|
||||
|
||||
Dataset<Relation> relation = readPath(spark, inputPath, Relation.class);
|
||||
relation.createOrReplaceTempView("relation");
|
||||
|
@ -115,7 +116,7 @@ public class PrepareResultCommunitySet {
|
|||
}
|
||||
|
||||
private static MapFunction<ResultOrganizations, ResultCommunityList> mapResultCommunityFn(
|
||||
OrganizationMap organizationMap) {
|
||||
CommunityEntityMap organizationMap) {
|
||||
return value -> {
|
||||
String rId = value.getResultId();
|
||||
Optional<List<String>> orgs = Optional.ofNullable(value.getMerges());
|
||||
|
|
|
@ -4,10 +4,10 @@
|
|||
<name>sourcePath</name>
|
||||
<description>the source path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>organizationtoresultcommunitymap</name>
|
||||
<description>organization community map</description>
|
||||
</property>
|
||||
<!-- <property>-->
|
||||
<!-- <name>organizationtoresultcommunitymap</name>-->
|
||||
<!-- <description>organization community map</description>-->
|
||||
<!-- </property>-->
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
<description>the output path</description>
|
||||
|
|
|
@ -1568,4 +1568,42 @@ public class BulkTagJobTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
void newConfTest() throws Exception {
|
||||
final String pathMap = BulkTagJobTest.pathMap;
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates").getPath(),
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(10, tmp.count());
|
||||
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
||||
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||
|
||||
verificationDataset.createOrReplaceTempView("dataset");
|
||||
|
||||
String query = "select id, MyT.id community "
|
||||
+ "from dataset "
|
||||
+ "lateral view explode(context) c as MyT "
|
||||
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||
|
||||
Assertions.assertEquals(0, spark.sql(query).count());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ class CommunityConfigurationFactoryTest {
|
|||
sc.setVerb("not_contains");
|
||||
sc.setField("contributor");
|
||||
sc.setValue("DARIAH");
|
||||
sc.setSelection(resolver.getSelectionCriteria(sc.getVerb(), sc.getValue()));
|
||||
sc.setSelection(resolver);//.getSelectionCriteria(sc.getVerb(), sc.getValue()));
|
||||
String metadata = "This work has been partially supported by DARIAH-EU infrastructure";
|
||||
Assertions.assertFalse(sc.verifyCriteria(metadata));
|
||||
}
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
|
||||
package eu.dnetlib.dhp.bulktag;
|
||||
|
||||
import eu.dnetlib.dhp.api.Utils;
|
||||
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
|
||||
import eu.dnetlib.dhp.bulktag.community.Community;
|
||||
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
|
||||
|
||||
import eu.dnetlib.dhp.api.model.CommunityModel;
|
||||
import eu.dnetlib.dhp.api.model.CommunitySummary;
|
||||
import eu.dnetlib.dhp.api.model.DatasourceList;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.api.QueryCommunityAPI;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 06/10/23
|
||||
*/
|
||||
public class QueryCommunityAPITest {
|
||||
|
||||
@Test
|
||||
void communityList() throws Exception {
|
||||
String body = QueryCommunityAPI.communities();
|
||||
new ObjectMapper()
|
||||
.readValue(body, CommunitySummary.class)
|
||||
.forEach(p -> {
|
||||
try {
|
||||
System.out.println(new ObjectMapper().writeValueAsString(p));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void community() throws Exception {
|
||||
String id = "dh-ch";
|
||||
String body = QueryCommunityAPI.community(id);
|
||||
System.out.println(new ObjectMapper().writeValueAsString(new ObjectMapper()
|
||||
.readValue(body, CommunityModel.class)))
|
||||
;
|
||||
}
|
||||
|
||||
@Test
|
||||
void communityDatasource() throws Exception {
|
||||
String id = "dh-ch";
|
||||
String body = QueryCommunityAPI.communityDatasource(id);
|
||||
new ObjectMapper()
|
||||
.readValue(body, DatasourceList.class)
|
||||
.forEach(ds-> {
|
||||
try {
|
||||
System.out.println(new ObjectMapper().writeValueAsString(ds));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
;
|
||||
}
|
||||
|
||||
@Test
|
||||
void validCommunities() throws Exception {
|
||||
CommunityConfiguration cc = Utils.getCommunityConfiguration();
|
||||
System.out.println(cc.getCommunities().keySet());
|
||||
Community community =cc.getCommunities().get("aurora");
|
||||
Assertions.assertEquals(0, community.getSubjects().size());
|
||||
Assertions.assertEquals(null, community.getConstraints());
|
||||
Assertions.assertEquals(null, community.getRemoveConstraints());
|
||||
Assertions.assertEquals(2, community.getZenodoCommunities().size());
|
||||
Assertions.assertTrue(community.getZenodoCommunities().stream().anyMatch(c -> c.equals("aurora-universities-network")));
|
||||
Assertions.assertTrue(community.getZenodoCommunities().stream().anyMatch(c -> c.equals("university-of-innsbruck")));
|
||||
Assertions.assertEquals(35, community.getProviders().size());
|
||||
Assertions.assertEquals(35, community.getProviders().stream().filter(p->p.getSelectionConstraints()==null).count());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getCommunityProjects() throws Exception {
|
||||
CommunityEntityMap projectMap = Utils.getCommunityProjects();
|
||||
Assertions.assertFalse(projectMap.containsKey("mes"));
|
||||
Assertions.assertEquals(33, projectMap.size());
|
||||
Assertions.assertTrue(projectMap.keySet().stream().allMatch(k -> projectMap.get(k).stream().allMatch(p -> p.startsWith("40|"))));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,14 +1,14 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.group;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
|
@ -18,108 +18,108 @@ import org.apache.spark.sql.Encoders;
|
|||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
public class GroupEntitiesSparkJobTest {
|
||||
|
||||
private static SparkSession spark;
|
||||
private static SparkSession spark;
|
||||
|
||||
private static ObjectMapper mapper = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
private static ObjectMapper mapper = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
private static Path workingDir;
|
||||
private Path dataInputPath;
|
||||
private static Path workingDir;
|
||||
private Path dataInputPath;
|
||||
|
||||
private Path checkpointPath;
|
||||
private Path checkpointPath;
|
||||
|
||||
private Path outputPath;
|
||||
private Path outputPath;
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
|
||||
conf.setMaster("local");
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
spark = SparkSession.builder().config(conf).getOrCreate();
|
||||
}
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
|
||||
conf.setMaster("local");
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
spark = SparkSession.builder().config(conf).getOrCreate();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void beforeEach() throws IOException, URISyntaxException {
|
||||
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
|
||||
checkpointPath = workingDir.resolve("grouped_entity");
|
||||
outputPath = workingDir.resolve("dispatched_entity");
|
||||
}
|
||||
@BeforeEach
|
||||
public void beforeEach() throws IOException, URISyntaxException {
|
||||
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
|
||||
checkpointPath = workingDir.resolve("grouped_entity");
|
||||
outputPath = workingDir.resolve("dispatched_entity");
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
spark.stop();
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
}
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
spark.stop();
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(1)
|
||||
void testGroupEntities() throws Exception {
|
||||
GroupEntitiesSparkJob.main(new String[]{
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-graphInputPath",
|
||||
dataInputPath.toString(),
|
||||
"-checkpointPath",
|
||||
checkpointPath.toString(),
|
||||
"-outputPath",
|
||||
outputPath.toString(),
|
||||
"-filterInvisible",
|
||||
Boolean.FALSE.toString()
|
||||
});
|
||||
@Test
|
||||
@Order(1)
|
||||
void testGroupEntities() throws Exception {
|
||||
GroupEntitiesSparkJob.main(new String[] {
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-graphInputPath",
|
||||
dataInputPath.toString(),
|
||||
"-checkpointPath",
|
||||
checkpointPath.toString(),
|
||||
"-outputPath",
|
||||
outputPath.toString(),
|
||||
"-filterInvisible",
|
||||
Boolean.FALSE.toString()
|
||||
});
|
||||
|
||||
Dataset<OafEntity> checkpointTable = spark
|
||||
.read()
|
||||
.load(checkpointPath.toString())
|
||||
.selectExpr("COALESCE(*)")
|
||||
.as(Encoders.kryo(OafEntity.class));
|
||||
Dataset<OafEntity> checkpointTable = spark
|
||||
.read()
|
||||
.load(checkpointPath.toString())
|
||||
.selectExpr("COALESCE(*)")
|
||||
.as(Encoders.kryo(OafEntity.class));
|
||||
|
||||
assertEquals(
|
||||
1,
|
||||
checkpointTable
|
||||
.filter(
|
||||
(FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
|
||||
.equals(r.getId()) &&
|
||||
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
|
||||
.count());
|
||||
|
||||
assertEquals(
|
||||
1,
|
||||
checkpointTable
|
||||
.filter(
|
||||
(FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
|
||||
.equals(r.getId()) &&
|
||||
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
|
||||
.count());
|
||||
Dataset<Result> output = spark
|
||||
.read()
|
||||
.textFile(
|
||||
DHPUtils
|
||||
.toSeq(
|
||||
HdfsSupport
|
||||
.listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
|
||||
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
|
||||
|
||||
|
||||
Dataset<Result> output = spark
|
||||
.read()
|
||||
.textFile(
|
||||
DHPUtils
|
||||
.toSeq(
|
||||
HdfsSupport
|
||||
.listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
|
||||
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
|
||||
|
||||
assertEquals(3, output.count());
|
||||
assertEquals(
|
||||
2,
|
||||
output
|
||||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||
.filter((FilterFunction<String>) s -> s.equals("publication"))
|
||||
.count());
|
||||
assertEquals(
|
||||
1,
|
||||
output
|
||||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||
.filter((FilterFunction<String>) s -> s.equals("dataset"))
|
||||
.count());
|
||||
}
|
||||
}
|
||||
assertEquals(3, output.count());
|
||||
assertEquals(
|
||||
2,
|
||||
output
|
||||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||
.filter((FilterFunction<String>) s -> s.equals("publication"))
|
||||
.count());
|
||||
assertEquals(
|
||||
1,
|
||||
output
|
||||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||
.filter((FilterFunction<String>) s -> s.equals("dataset"))
|
||||
.count());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue