diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 040c89782..8adc88920 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -95,7 +95,7 @@ public class SparkAtomicActionScoreJob implements Serializable { return projectScores.map((MapFunction) bipProjectScores -> { Project project = new Project(); - project.setId(bipProjectScores.getProjectId()); + // project.setId(bipProjectScores.getProjectId()); project.setMeasures(bipProjectScores.toMeasures()); return project; }, Encoders.bean(Project.class)) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 739295c91..cb1c70059 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -67,60 +67,60 @@ public class SparkPropagateRelation extends AbstractSparkAction { log.info("graphOutputPath: '{}'", graphOutputPath); Dataset mergeRels = spark - .read() - .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) - .as(REL_BEAN_ENC); + .read() + .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) + .as(REL_BEAN_ENC); // Dataset idsToMerge = mergeRels - .where(col("relClass").equalTo(ModelConstants.MERGES)) - .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) - .distinct(); + .where(col("relClass").equalTo(ModelConstants.MERGES)) + .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) + .distinct(); Dataset allRels = spark - .read() - .schema(REL_BEAN_ENC.schema()) - .json(graphBasePath + "/relation"); + .read() + .schema(REL_BEAN_ENC.schema()) + .json(graphBasePath + "/relation"); Dataset dedupedRels = allRels - .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") - .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") - .select("_1._1", "_1._2.dedupID", "_2.dedupID") - .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) - .map((MapFunction, Relation>) t -> { - Relation rel = t._1(); - String newSource = t._2(); - String newTarget = t._3(); + .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .select("_1._1", "_1._2.dedupID", "_2.dedupID") + .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) + .map((MapFunction, Relation>) t -> { + Relation rel = t._1(); + String newSource = t._2(); + String newTarget = t._3(); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } - if (newSource != null || newTarget != null) { - rel.getDataInfo().setDeletedbyinference(false); + if (newSource != null || newTarget != null) { + rel.getDataInfo().setDeletedbyinference(false); - if (newSource != null) - rel.setSource(newSource); + if (newSource != null) + rel.setSource(newSource); - if (newTarget != null) - rel.setTarget(newTarget); - } + if (newTarget != null) + rel.setTarget(newTarget); + } - return rel; - }, REL_BEAN_ENC); + return rel; + }, REL_BEAN_ENC); // ids of records that are both not deletedbyinference and not invisible Dataset ids = validIds(spark, graphBasePath); // filter relations that point to valid records, can force them to be visible Dataset cleanedRels = dedupedRels - .join(ids, col("source").equalTo(ids.col("id")), "leftsemi") - .join(ids, col("target").equalTo(ids.col("id")), "leftsemi") - .as(REL_BEAN_ENC) - .map((MapFunction) r -> { - r.getDataInfo().setInvisible(false); - return r; - }, REL_KRYO_ENC); + .join(ids, col("source").equalTo(ids.col("id")), "leftsemi") + .join(ids, col("target").equalTo(ids.col("id")), "leftsemi") + .as(REL_BEAN_ENC) + .map((MapFunction) r -> { + r.getDataInfo().setInvisible(false); + return r; + }, REL_KRYO_ENC); Dataset distinctRels = cleanedRels .groupByKey( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java new file mode 100644 index 000000000..635ee2027 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java @@ -0,0 +1,78 @@ + +package eu.dnetlib.dhp.api; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; + +import org.apache.http.HttpHeaders; +import org.jetbrains.annotations.NotNull; + +import com.google.gson.Gson; + +/** + * @author miriam.baglioni + * @Date 06/10/23 + */ +public class QueryCommunityAPI { + private static final String baseUrl = "https://services.openaire.eu/openaire/"; + + private static String get(String geturl) throws IOException{ + URL url = new URL(geturl); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setDoOutput(true); + conn.setRequestMethod("GET"); + + int responseCode = conn.getResponseCode(); + String body = getBody(conn); + conn.disconnect(); + if (responseCode != HttpURLConnection.HTTP_OK) + throw new IOException("Unexpected code " + responseCode + body); + + return body; + } + + public static String communities() throws IOException { + return get(baseUrl + "community/communities"); + } + + public static String community(String id) throws IOException { + return get(baseUrl + "community/" + id); + } + + public static String communityDatasource(String id)throws IOException{ + return get(baseUrl + "community/" + id + "/contentproviders"); + + } + + public static String communityPropagationOrganization(String id) throws IOException { + return get(baseUrl + "community/" + id + "/propagationOrganizations"); + } + + public static String communityProjects(String id, String page, String size) throws IOException{ + return get(baseUrl + "community/" + id +"/projects/" + page + "/" + size); + } + + @NotNull + private static String getBody(HttpURLConnection conn) throws IOException { + String body = "{}"; + try (BufferedReader br = new BufferedReader( + new InputStreamReader(conn.getInputStream(), "utf-8"))) { + StringBuilder response = new StringBuilder(); + String responseLine = null; + while ((responseLine = br.readLine()) != null) { + response.append(responseLine.trim()); + } + + body = response.toString(); + + } + return body; + } + + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java new file mode 100644 index 000000000..5b3004a5d --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -0,0 +1,136 @@ +package eu.dnetlib.dhp.api; + +import com.amazonaws.util.StringUtils; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import eu.dnetlib.dhp.api.model.*; +import eu.dnetlib.dhp.bulktag.community.Community; +import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; +import eu.dnetlib.dhp.bulktag.community.Provider; +import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; +import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory; + +import javax.management.Query; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * @author miriam.baglioni + * @Date 09/10/23 + */ +public class Utils implements Serializable { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final VerbResolver resolver = VerbResolverFactory.newInstance(); + + public static CommunityConfiguration getCommunityConfiguration() throws IOException { + final Map communities = Maps.newHashMap(); + List validCommunities = new ArrayList<>(); + getValidCommunities() + .forEach(community -> { + try { + CommunityModel cm = MAPPER.readValue(QueryCommunityAPI.community(community.getId()), CommunityModel.class); + validCommunities.add(getCommunity(cm)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + validCommunities.forEach(community ->{ + try { + DatasourceList dl = MAPPER.readValue(QueryCommunityAPI.communityDatasource(community.getId()), DatasourceList.class); + community.setProviders(dl.stream().map(d -> { +// if(d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) +// return null; + Provider p = new Provider(); + p.setOpenaireId("10|" + d.getOpenaireId()); + p.setSelectionConstraints(d.getSelectioncriteria()); + if(p.getSelectionConstraints() != null) + p.getSelectionConstraints().setSelection(resolver); + return p; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + validCommunities.forEach(community ->{ + if(community.isValid()) + communities.put(community.getId(), community); + }); + return new CommunityConfiguration(communities); + } + + private static Community getCommunity(CommunityModel cm){ + Community c = new Community(); + c.setId(cm.getId()); + c.setZenodoCommunities(cm.getOtherZenodoCommunities()); + if(!StringUtils.isNullOrEmpty(cm.getZenodoCommunity())) + c.getZenodoCommunities().add(cm.getZenodoCommunity()); + c.setSubjects(cm.getSubjects()); + c.getSubjects().addAll(cm.getFos()); + c.getSubjects().addAll(cm.getSdg()); + c.setConstraints(cm.getAdvancedConstraints()); + if(c.getConstraints()!=null) + c.getConstraints().setSelection(resolver); + c.setRemoveConstraints(cm.getRemoveConstraints()); + if(c.getRemoveConstraints()!=null) + c.getRemoveConstraints().setSelection(resolver); + return c; + } + + public static List getValidCommunities() throws IOException { + return MAPPER.readValue(QueryCommunityAPI.communities(), CommunitySummary.class) + .stream() + .filter(community -> !community.getStatus().equals("hidden") && + (community.getType().equals("ri") || community.getType().equals("community"))) + .collect(Collectors.toList()); + } + public static CommunityEntityMap getCommunityOrganization() throws IOException { + CommunityEntityMap organizationMap = new CommunityEntityMap(); + getValidCommunities() + .forEach(community -> { + String id = community.getId(); + try { + List associatedOrgs = MAPPER.readValue(QueryCommunityAPI.communityPropagationOrganization(id), OrganizationList.class); + if(associatedOrgs.size() >0){ + organizationMap.put(id, associatedOrgs); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return organizationMap; + } + + public static CommunityEntityMap getCommunityProjects()throws IOException{ + CommunityEntityMap projectMap = new CommunityEntityMap(); + getValidCommunities() + .forEach(community ->{ + int page = -1; + int size = 100; + ContentModel cm = new ContentModel(); + List projectList = new ArrayList<>(); + do{ + page ++; + try { + cm = MAPPER.readValue( QueryCommunityAPI.communityProjects(community.getId(), String.valueOf(page), String.valueOf(size)), ContentModel.class); + if (cm.getContent().size() > 0){ + + cm.getContent().forEach(p -> + projectList.add ("40|" + p.getOpenaireId())); + projectMap.put(community.getId(), projectList); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }while (!cm.getLast()); + }); + return projectMap; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityContentprovider.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityContentprovider.java new file mode 100644 index 000000000..5378e556a --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityContentprovider.java @@ -0,0 +1,43 @@ +package eu.dnetlib.dhp.api.model; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.google.gson.Gson; +import eu.dnetlib.dhp.bulktag.community.SelectionConstraints; + + +@JsonAutoDetect +@JsonIgnoreProperties(ignoreUnknown = true) +public class CommunityContentprovider { + private String openaireId; + private SelectionConstraints selectioncriteria; + + private String enabled; + + public String getEnabled() { + return enabled; + } + + public void setEnabled(String enabled) { + this.enabled = enabled; + } + + public String getOpenaireId() { + return openaireId; + } + + public void setOpenaireId(final String openaireId) { + this.openaireId = openaireId; + } + + + public SelectionConstraints getSelectioncriteria() { + + return this.selectioncriteria; + } + + public void setSelectioncriteria(SelectionConstraints selectioncriteria) { + this.selectioncriteria = selectioncriteria; + + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/OrganizationMap.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityEntityMap.java similarity index 59% rename from dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/OrganizationMap.java rename to dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityEntityMap.java index 7d786058a..ca3eb2857 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/OrganizationMap.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityEntityMap.java @@ -1,13 +1,13 @@ -package eu.dnetlib.dhp.resulttocommunityfromorganization; +package eu.dnetlib.dhp.api.model; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -public class OrganizationMap extends HashMap> { +public class CommunityEntityMap extends HashMap> { - public OrganizationMap() { + public CommunityEntityMap() { super(); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityModel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityModel.java new file mode 100644 index 000000000..c5b3fac7c --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityModel.java @@ -0,0 +1,108 @@ + +package eu.dnetlib.dhp.api.model; + +import java.io.Serializable; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import eu.dnetlib.dhp.bulktag.community.SelectionConstraints; + + +/** + * @author miriam.baglioni + * @Date 06/10/23 + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CommunityModel implements Serializable { + private String id; + private String type; + private String status; + + private String zenodoCommunity; + private List subjects; + private List otherZenodoCommunities; + private List fos; + private List sdg; + private SelectionConstraints advancedConstraints; + private SelectionConstraints removeConstraints; + + public String getZenodoCommunity() { + return zenodoCommunity; + } + + public void setZenodoCommunity(String zenodoCommunity) { + this.zenodoCommunity = zenodoCommunity; + } + + public List getSubjects() { + return subjects; + } + + public void setSubjects(List subjects) { + this.subjects = subjects; + } + + public List getOtherZenodoCommunities() { + return otherZenodoCommunities; + } + + public void setOtherZenodoCommunities(List otherZenodoCommunities) { + this.otherZenodoCommunities = otherZenodoCommunities; + } + + public List getFos() { + return fos; + } + + public void setFos(List fos) { + this.fos = fos; + } + + public List getSdg() { + return sdg; + } + + public void setSdg(List sdg) { + this.sdg = sdg; + } + + public SelectionConstraints getRemoveConstraints() { + return removeConstraints; + } + + public void setRemoveConstraints(SelectionConstraints removeConstraints) { + this.removeConstraints = removeConstraints; + } + + public SelectionConstraints getAdvancedConstraints() { + return advancedConstraints; + } + + public void setAdvancedConstraints(SelectionConstraints advancedConstraints) { + this.advancedConstraints = advancedConstraints; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunitySummary.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunitySummary.java new file mode 100644 index 000000000..774b6c874 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunitySummary.java @@ -0,0 +1,16 @@ +package eu.dnetlib.dhp.api.model; + +import java.io.Serializable; +import java.util.ArrayList; + +/** + * @author miriam.baglioni + * @Date 06/10/23 + */ +public class CommunitySummary extends ArrayList implements Serializable { + public CommunitySummary() { + super(); + } + } + + diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/ContentModel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/ContentModel.java new file mode 100644 index 000000000..a755a9455 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/ContentModel.java @@ -0,0 +1,50 @@ +package eu.dnetlib.dhp.api.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.io.Serializable; +import java.util.List; + +/** + * @author miriam.baglioni + * @Date 09/10/23 + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ContentModel implements Serializable { + private List content; + private Integer totalPages; + private Boolean last; + private Integer number; + + public List getContent() { + return content; + } + + public void setContent(List content) { + this.content = content; + } + + public Integer getTotalPages() { + return totalPages; + } + + public void setTotalPages(Integer totalPages) { + this.totalPages = totalPages; + } + + public Boolean getLast() { + return last; + } + + public void setLast(Boolean last) { + this.last = last; + } + + public Integer getNumber() { + return number; + } + + public void setNumber(Integer number) { + this.number = number; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/DatasourceList.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/DatasourceList.java new file mode 100644 index 000000000..8e31d7612 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/DatasourceList.java @@ -0,0 +1,12 @@ +package eu.dnetlib.dhp.api.model; + +import eu.dnetlib.dhp.api.model.CommunityContentprovider; + + +import java.io.Serializable; +import java.util.ArrayList; +public class DatasourceList extends ArrayList implements Serializable { + public DatasourceList(){ + super(); + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/OrganizationList.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/OrganizationList.java new file mode 100644 index 000000000..6895967ff --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/OrganizationList.java @@ -0,0 +1,15 @@ +package eu.dnetlib.dhp.api.model; + +import java.io.Serializable; +import java.util.ArrayList; + +/** + * @author miriam.baglioni + * @Date 09/10/23 + */ +public class OrganizationList extends ArrayList implements Serializable { + + public OrganizationList(){ + super(); + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/ProjectModel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/ProjectModel.java new file mode 100644 index 000000000..50bdcf649 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/ProjectModel.java @@ -0,0 +1,23 @@ +package eu.dnetlib.dhp.api.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 09/10/23 + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ProjectModel implements Serializable { + + private String openaireId; + + public String getOpenaireId() { + return openaireId; + } + + public void setOpenaireId(String openaireId) { + this.openaireId = openaireId; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 3186ed5c0..b61bc2c9f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -6,10 +6,10 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; +import eu.dnetlib.dhp.api.Utils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -87,7 +87,7 @@ public class SparkBulkTagJob { if (isTest) { cc = CommunityConfigurationFactory.newInstance(taggingConf); } else { - cc = QueryInformationSystem.getCommunityConfiguration(parser.get("isLookUpUrl")); + cc = Utils.getCommunityConfiguration();//QueryInformationSystem.getCommunityConfiguration(parser.get("isLookUpUrl")); } runWithSparkSession( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java index b44376e22..d281f8783 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java @@ -4,16 +4,18 @@ package eu.dnetlib.dhp.bulktag.community; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import com.google.gson.Gson; + /** Created by miriam on 01/08/2018. */ public class Community implements Serializable { private String id; private List subjects = new ArrayList<>(); private List providers = new ArrayList<>(); - private List zenodoCommunities = new ArrayList<>(); + private List zenodoCommunities = new ArrayList<>(); private SelectionConstraints constraints = new SelectionConstraints(); private SelectionConstraints removeConstraints = new SelectionConstraints(); @@ -26,7 +28,7 @@ public class Community implements Serializable { return !getSubjects().isEmpty() || !getProviders().isEmpty() || !getZenodoCommunities().isEmpty() - || getConstraints().getCriteria() != null; + || (Optional.ofNullable(getConstraints()).isPresent() && getConstraints().getCriteria() != null); } public String getId() { @@ -53,11 +55,11 @@ public class Community implements Serializable { this.providers = providers; } - public List getZenodoCommunities() { + public List getZenodoCommunities() { return zenodoCommunities; } - public void setZenodoCommunities(List zenodoCommunities) { + public void setZenodoCommunities(List zenodoCommunities) { this.zenodoCommunities = zenodoCommunities; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java index e061ccd5e..a658c7ff5 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfiguration.java @@ -81,7 +81,7 @@ public class CommunityConfiguration implements Serializable { this.removeConstraintsMap = removeConstraintsMap; } - CommunityConfiguration(final Map communities) { + public CommunityConfiguration(final Map communities) { this.communities = communities; init(); } @@ -117,10 +117,10 @@ public class CommunityConfiguration implements Serializable { add(d.getOpenaireId(), new Pair<>(id, d.getSelectionConstraints()), datasourceMap); } // get zenodo communities - for (ZenodoCommunity zc : c.getZenodoCommunities()) { + for (String zc : c.getZenodoCommunities()) { add( - zc.getZenodoCommunityId(), - new Pair<>(id, zc.getSelCriteria()), + zc, + new Pair<>(id, null), zenodocommunityMap); } selectionConstraintsMap.put(id, c.getConstraints()); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfigurationFactory.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfigurationFactory.java index 7b9e03ef6..013bf7168 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfigurationFactory.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/CommunityConfigurationFactory.java @@ -5,7 +5,6 @@ import java.io.StringReader; import java.util.ArrayList; import java.util.List; import java.util.Map; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -143,16 +142,16 @@ public class CommunityConfigurationFactory { return providerList; } - private static List parseZenodoCommunities(final Node node) { + private static List parseZenodoCommunities(final Node node) { final List list = node.selectNodes("./zenodocommunities/zenodocommunity"); - final List zenodoCommunityList = new ArrayList<>(); + final List zenodoCommunityList = new ArrayList<>(); for (Node n : list) { - ZenodoCommunity zc = new ZenodoCommunity(); - zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText()); - zc.setSelCriteria(n.selectSingleNode("./selcriteria")); +// ZenodoCommunity zc = new ZenodoCommunity(); +// zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText()); +// zc.setSelCriteria(n.selectSingleNode("./selcriteria")); - zenodoCommunityList.add(zc); + zenodoCommunityList.add(n.selectSingleNode("./zenodoid").getText()); } log.info("size of the zenodo community list " + zenodoCommunityList.size()); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Constraint.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Constraint.java index ed58cc14d..03ae1117b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Constraint.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Constraint.java @@ -6,12 +6,14 @@ import java.lang.reflect.InvocationTargetException; import eu.dnetlib.dhp.bulktag.criteria.Selection; import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; +import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore; public class Constraint implements Serializable { private String verb; private String field; private String value; // private String element; + @JsonIgnore private Selection selection; public String getVerb() { @@ -37,11 +39,11 @@ public class Constraint implements Serializable { public void setValue(String value) { this.value = value; } - - public void setSelection(Selection sel) { - selection = sel; - } - +//@JsonIgnore + //public void setSelection(Selection sel) { +// selection = sel; +// } +@JsonIgnore public void setSelection(VerbResolver resolver) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { @@ -52,11 +54,5 @@ public class Constraint implements Serializable { return selection.apply(metadata); } -// public String getElement() { -// return element; -// } -// -// public void setElement(String element) { -// this.element = element; -// } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/SelectionConstraints.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/SelectionConstraints.java index c7dcce812..8e6d25e9b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/SelectionConstraints.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/SelectionConstraints.java @@ -7,11 +7,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; - +@JsonAutoDetect public class SelectionConstraints implements Serializable { private List criteria; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java index 0fc8cb390..ff6b73286 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java @@ -6,12 +6,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.*; +import eu.dnetlib.dhp.api.Utils; +import eu.dnetlib.dhp.api.model.CommunityEntityMap; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -48,10 +48,11 @@ public class PrepareResultCommunitySet { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final OrganizationMap organizationMap = new Gson() - .fromJson( - parser.get("organizationtoresultcommunitymap"), - OrganizationMap.class); +// final CommunityEntityMap organizationMap = new Gson() +// .fromJson( +// parser.get("organizationtoresultcommunitymap"), +// CommunityEntityMap.class); + final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(); log.info("organizationMap: {}", new Gson().toJson(organizationMap)); SparkConf conf = new SparkConf(); @@ -70,7 +71,7 @@ public class PrepareResultCommunitySet { SparkSession spark, String inputPath, String outputPath, - OrganizationMap organizationMap) { + CommunityEntityMap organizationMap) { Dataset relation = readPath(spark, inputPath, Relation.class); relation.createOrReplaceTempView("relation"); @@ -115,7 +116,7 @@ public class PrepareResultCommunitySet { } private static MapFunction mapResultCommunityFn( - OrganizationMap organizationMap) { + CommunityEntityMap organizationMap) { return value -> { String rId = value.getResultId(); Optional> orgs = Optional.ofNullable(value.getMerges()); diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml index 6a329fdc4..d9805ab7b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml @@ -4,10 +4,10 @@ sourcePath the source path - - organizationtoresultcommunitymap - organization community map - + + + + outputPath the output path diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 660a55472..745cd7e6f 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -1568,4 +1568,42 @@ public class BulkTagJobTest { } + @Test + void newConfTest() throws Exception { + final String pathMap = BulkTagJobTest.pathMap; + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", + getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates").getPath(), + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-outputPath", workingDir.toString() + "/dataset", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + + verificationDataset.createOrReplaceTempView("dataset"); + + String query = "select id, MyT.id community " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + Assertions.assertEquals(0, spark.sql(query).count()); + } + } diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/CommunityConfigurationFactoryTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/CommunityConfigurationFactoryTest.java index c8fd62c8e..7e12bf9c3 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/CommunityConfigurationFactoryTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/CommunityConfigurationFactoryTest.java @@ -47,7 +47,7 @@ class CommunityConfigurationFactoryTest { sc.setVerb("not_contains"); sc.setField("contributor"); sc.setValue("DARIAH"); - sc.setSelection(resolver.getSelectionCriteria(sc.getVerb(), sc.getValue())); + sc.setSelection(resolver);//.getSelectionCriteria(sc.getVerb(), sc.getValue())); String metadata = "This work has been partially supported by DARIAH-EU infrastructure"; Assertions.assertFalse(sc.verifyCriteria(metadata)); } diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/QueryCommunityAPITest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/QueryCommunityAPITest.java new file mode 100644 index 000000000..1b3403535 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/QueryCommunityAPITest.java @@ -0,0 +1,91 @@ + +package eu.dnetlib.dhp.bulktag; + +import eu.dnetlib.dhp.api.Utils; +import eu.dnetlib.dhp.api.model.CommunityEntityMap; +import eu.dnetlib.dhp.bulktag.community.Community; +import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; + +import eu.dnetlib.dhp.api.model.CommunityModel; +import eu.dnetlib.dhp.api.model.CommunitySummary; +import eu.dnetlib.dhp.api.model.DatasourceList; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.api.QueryCommunityAPI; + +import java.util.List; + +/** + * @author miriam.baglioni + * @Date 06/10/23 + */ +public class QueryCommunityAPITest { + + @Test + void communityList() throws Exception { + String body = QueryCommunityAPI.communities(); + new ObjectMapper() + .readValue(body, CommunitySummary.class) + .forEach(p -> { + try { + System.out.println(new ObjectMapper().writeValueAsString(p)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + void community() throws Exception { + String id = "dh-ch"; + String body = QueryCommunityAPI.community(id); + System.out.println(new ObjectMapper().writeValueAsString(new ObjectMapper() + .readValue(body, CommunityModel.class))) + ; + } + + @Test + void communityDatasource() throws Exception { + String id = "dh-ch"; + String body = QueryCommunityAPI.communityDatasource(id); + new ObjectMapper() + .readValue(body, DatasourceList.class) + .forEach(ds-> { + try { + System.out.println(new ObjectMapper().writeValueAsString(ds)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + ; + } + + @Test + void validCommunities() throws Exception { + CommunityConfiguration cc = Utils.getCommunityConfiguration(); + System.out.println(cc.getCommunities().keySet()); + Community community =cc.getCommunities().get("aurora"); + Assertions.assertEquals(0, community.getSubjects().size()); + Assertions.assertEquals(null, community.getConstraints()); + Assertions.assertEquals(null, community.getRemoveConstraints()); + Assertions.assertEquals(2, community.getZenodoCommunities().size()); + Assertions.assertTrue(community.getZenodoCommunities().stream().anyMatch(c -> c.equals("aurora-universities-network"))); + Assertions.assertTrue(community.getZenodoCommunities().stream().anyMatch(c -> c.equals("university-of-innsbruck"))); + Assertions.assertEquals(35, community.getProviders().size()); + Assertions.assertEquals(35, community.getProviders().stream().filter(p->p.getSelectionConstraints()==null).count()); + } + + @Test + void getCommunityProjects() throws Exception { + CommunityEntityMap projectMap = Utils.getCommunityProjects(); + Assertions.assertFalse(projectMap.containsKey("mes")); + Assertions.assertEquals(33, projectMap.size()); + Assertions.assertTrue(projectMap.keySet().stream().allMatch(k -> projectMap.get(k).stream().allMatch(p -> p.startsWith("40|")))); + } + +} + diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index b878e778e..0887adf45 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,14 +1,14 @@ package eu.dnetlib.dhp.oa.graph.group; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -18,108 +18,108 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest { - private static SparkSession spark; + private static SparkSession spark; - private static ObjectMapper mapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - private static Path workingDir; - private Path dataInputPath; + private static Path workingDir; + private Path dataInputPath; - private Path checkpointPath; + private Path checkpointPath; - private Path outputPath; + private Path outputPath; - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); - SparkConf conf = new SparkConf(); - conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); - conf.setMaster("local"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - spark = SparkSession.builder().config(conf).getOrCreate(); - } + SparkConf conf = new SparkConf(); + conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); + conf.setMaster("local"); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + spark = SparkSession.builder().config(conf).getOrCreate(); + } - @BeforeEach - public void beforeEach() throws IOException, URISyntaxException { - dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); - checkpointPath = workingDir.resolve("grouped_entity"); - outputPath = workingDir.resolve("dispatched_entity"); - } + @BeforeEach + public void beforeEach() throws IOException, URISyntaxException { + dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); + checkpointPath = workingDir.resolve("grouped_entity"); + outputPath = workingDir.resolve("dispatched_entity"); + } - @AfterAll - public static void afterAll() throws IOException { - spark.stop(); - FileUtils.deleteDirectory(workingDir.toFile()); - } + @AfterAll + public static void afterAll() throws IOException { + spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); + } - @Test - @Order(1) - void testGroupEntities() throws Exception { - GroupEntitiesSparkJob.main(new String[]{ - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-graphInputPath", - dataInputPath.toString(), - "-checkpointPath", - checkpointPath.toString(), - "-outputPath", - outputPath.toString(), - "-filterInvisible", - Boolean.FALSE.toString() - }); + @Test + @Order(1) + void testGroupEntities() throws Exception { + GroupEntitiesSparkJob.main(new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-graphInputPath", + dataInputPath.toString(), + "-checkpointPath", + checkpointPath.toString(), + "-outputPath", + outputPath.toString(), + "-filterInvisible", + Boolean.FALSE.toString() + }); - Dataset checkpointTable = spark - .read() - .load(checkpointPath.toString()) - .selectExpr("COALESCE(*)") - .as(Encoders.kryo(OafEntity.class)); + Dataset checkpointTable = spark + .read() + .load(checkpointPath.toString()) + .selectExpr("COALESCE(*)") + .as(Encoders.kryo(OafEntity.class)); + assertEquals( + 1, + checkpointTable + .filter( + (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" + .equals(r.getId()) && + r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) + .count()); - assertEquals( - 1, - checkpointTable - .filter( - (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" - .equals(r.getId()) && - r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) - .count()); + Dataset output = spark + .read() + .textFile( + DHPUtils + .toSeq( + HdfsSupport + .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) + .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); - - Dataset output = spark - .read() - .textFile( - DHPUtils - .toSeq( - HdfsSupport - .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); - - assertEquals(3, output.count()); - assertEquals( - 2, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("publication")) - .count()); - assertEquals( - 1, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("dataset")) - .count()); - } -} \ No newline at end of file + assertEquals(3, output.count()); + assertEquals( + 2, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("publication")) + .count()); + assertEquals( + 1, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("dataset")) + .count()); + } +}