forked from D-Net/dnet-hadoop
[Tagging Projects and Datasource] first extention of bulktagging to add the context to projects and datasource
This commit is contained in:
parent
3f7d262a4e
commit
6e1f383e4a
|
@ -6,6 +6,7 @@ import java.io.IOException;
|
|||
import java.io.InputStreamReader;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
|
@ -77,4 +78,5 @@ public class QueryCommunityAPI {
|
|||
return body;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -3,14 +3,13 @@ package eu.dnetlib.dhp.api;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.management.Query;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -24,7 +23,6 @@ import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
|
|||
import eu.dnetlib.dhp.bulktag.community.Provider;
|
||||
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
|
||||
import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
|
@ -58,7 +56,7 @@ public class Utils implements Serializable {
|
|||
if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
|
||||
return null;
|
||||
Provider p = new Provider();
|
||||
p.setOpenaireId("10|" + d.getOpenaireId());
|
||||
p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class)+"|" + d.getOpenaireId());
|
||||
p.setSelectionConstraints(d.getSelectioncriteria());
|
||||
if (p.getSelectionConstraints() != null)
|
||||
p.getSelectionConstraints().setSelection(resolver);
|
||||
|
@ -113,6 +111,7 @@ public class Utils implements Serializable {
|
|||
*/
|
||||
public static CommunityEntityMap getCommunityOrganization(String baseURL) throws IOException {
|
||||
CommunityEntityMap organizationMap = new CommunityEntityMap();
|
||||
String entityPrefix = ModelSupport.getIdPrefix(Organization.class);
|
||||
getValidCommunities(baseURL)
|
||||
.forEach(community -> {
|
||||
String id = community.getId();
|
||||
|
@ -124,9 +123,9 @@ public class Utils implements Serializable {
|
|||
if (!organizationMap
|
||||
.keySet()
|
||||
.contains(
|
||||
"20|" + o))
|
||||
organizationMap.put("20|" + o, new ArrayList<>());
|
||||
organizationMap.get("20|" + o).add(community.getId());
|
||||
entityPrefix + "|" + o))
|
||||
organizationMap.put(entityPrefix + "|" + o, new ArrayList<>());
|
||||
organizationMap.get(entityPrefix + "|" + o).add(community.getId());
|
||||
});
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
@ -138,7 +137,7 @@ public class Utils implements Serializable {
|
|||
|
||||
public static CommunityEntityMap getCommunityProjects(String baseURL) throws IOException {
|
||||
CommunityEntityMap projectMap = new CommunityEntityMap();
|
||||
|
||||
String entityPrefix = ModelSupport.getIdPrefix(Project.class);
|
||||
getValidCommunities(baseURL)
|
||||
.forEach(community -> {
|
||||
int page = -1;
|
||||
|
@ -155,9 +154,9 @@ public class Utils implements Serializable {
|
|||
ContentModel.class);
|
||||
if (cm.getContent().size() > 0) {
|
||||
cm.getContent().forEach(p -> {
|
||||
if (!projectMap.keySet().contains("40|" + p.getOpenaireId()))
|
||||
projectMap.put("40|" + p.getOpenaireId(), new ArrayList<>());
|
||||
projectMap.get("40|" + p.getOpenaireId()).add(community.getId());
|
||||
if (!projectMap.keySet().contains(entityPrefix + "|" + p.getOpenaireId()))
|
||||
projectMap.put(entityPrefix + "|" + p.getOpenaireId(), new ArrayList<>());
|
||||
projectMap.get(entityPrefix + "|" + p.getOpenaireId()).add(community.getId());
|
||||
});
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -174,4 +173,28 @@ public class Utils implements Serializable {
|
|||
.map(community -> community.getId())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static List<EntityCommunities> getDatasourceCommunities(String baseURL)throws IOException{
|
||||
List<CommunityModel> validCommunities = getValidCommunities(baseURL);
|
||||
HashMap<String, Set<String>> map = new HashMap<>();
|
||||
validCommunities.forEach(c -> {
|
||||
try {
|
||||
new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class)
|
||||
.forEach(d -> {
|
||||
if (!map.keySet().contains(d.getOpenaireId()))
|
||||
map.put(d.getOpenaireId(), new HashSet<>());
|
||||
|
||||
map.get(d.getOpenaireId()).add(c.getId());
|
||||
});
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
return map.keySet().stream().map(k -> EntityCommunities.newInstance(k, map.get(k).stream().collect(Collectors.toList()))).collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package eu.dnetlib.dhp.api.model;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 13/02/24
|
||||
*/
|
||||
public class EntityCommunities implements Serializable {
|
||||
private String entityId;
|
||||
private List<String> communitiesId;
|
||||
|
||||
public String getEntityId() {
|
||||
return entityId;
|
||||
}
|
||||
|
||||
public void setEntityId(String entityId) {
|
||||
this.entityId = entityId;
|
||||
}
|
||||
|
||||
public List<String> getCommunitiesId() {
|
||||
return communitiesId;
|
||||
}
|
||||
|
||||
public void setCommunitiesId(List<String> communitiesId) {
|
||||
this.communitiesId = communitiesId;
|
||||
}
|
||||
|
||||
public static EntityCommunities newInstance(String dsid, List<String> csid){
|
||||
EntityCommunities dsc = new EntityCommunities();
|
||||
dsc.entityId = dsid;
|
||||
dsc.communitiesId = csid;
|
||||
return dsc;
|
||||
}
|
||||
}
|
|
@ -5,7 +5,14 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
|
|||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
|
||||
import eu.dnetlib.dhp.api.model.EntityCommunities;
|
||||
import eu.dnetlib.dhp.api.model.DatasourceCommunitiesList;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
|
@ -23,11 +30,11 @@ import com.google.gson.Gson;
|
|||
import eu.dnetlib.dhp.api.Utils;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.bulktag.community.*;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class SparkBulkTagJob {
|
||||
|
||||
|
@ -89,9 +96,80 @@ public class SparkBulkTagJob {
|
|||
spark -> {
|
||||
extendCommunityConfigurationForEOSC(spark, inputPath, cc);
|
||||
execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc);
|
||||
execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL));
|
||||
execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL));
|
||||
});
|
||||
}
|
||||
|
||||
private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, CommunityEntityMap communityProjects) {
|
||||
Dataset<Project> projects = readPath(spark, inputPath + "project", Project.class);
|
||||
Dataset<EntityCommunities> pc = spark.createDataset(communityProjects.keySet().stream().map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))).collect(Collectors.toList()), Encoders.bean(EntityCommunities.class));
|
||||
|
||||
projects.joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left")
|
||||
.map((MapFunction<Tuple2<Project, EntityCommunities>, Project>) t2 -> {
|
||||
Project ds = t2._1();
|
||||
if (t2._2() != null){
|
||||
List<String> context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList());
|
||||
t2._2().getCommunitiesId().forEach(c -> {
|
||||
if(!context.contains(c)){
|
||||
Context con = new Context();
|
||||
con.setId(c);
|
||||
con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
|
||||
OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1")));
|
||||
ds.getContext().add(con)
|
||||
}
|
||||
});
|
||||
}
|
||||
return ds;
|
||||
} ,Encoders.bean(Project.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression","gzip")
|
||||
.json(outputPath + "project");
|
||||
|
||||
readPath(spark, outputPath + "project", Datasource.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression","gzip")
|
||||
.json(inputPath + "project");
|
||||
}
|
||||
|
||||
|
||||
private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, List<EntityCommunities> datasourceCommunities) {
|
||||
Dataset<Datasource> datasource = readPath(spark, inputPath + "datasource", Datasource.class);
|
||||
|
||||
Dataset<EntityCommunities> dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class));
|
||||
|
||||
datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left")
|
||||
.map((MapFunction<Tuple2<Datasource, EntityCommunities>, Datasource>) t2 -> {
|
||||
Datasource ds = t2._1();
|
||||
if (t2._2() != null){
|
||||
List<String> context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList());
|
||||
t2._2().getCommunitiesId().forEach(c -> {
|
||||
if(!context.contains(c)){
|
||||
Context con = new Context();
|
||||
con.setId(c);
|
||||
con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
|
||||
OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1")));
|
||||
ds.getContext().add(con)
|
||||
}
|
||||
});
|
||||
}
|
||||
return ds;
|
||||
} ,Encoders.bean(Datasource.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression","gzip")
|
||||
.json(outputPath + "datasource");
|
||||
|
||||
readPath(spark, outputPath + "datasource", Datasource.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression","gzip")
|
||||
.json(inputPath + "datasource");
|
||||
}
|
||||
|
||||
|
||||
private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath,
|
||||
CommunityConfiguration cc) {
|
||||
|
||||
|
@ -182,4 +260,6 @@ public class SparkBulkTagJob {
|
|||
};
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -888,7 +888,7 @@
|
|||
<mockito-core.version>3.3.3</mockito-core.version>
|
||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||
<vtd.version>[2.12,3.0)</vtd.version>
|
||||
<dhp-schemas.version>[4.17.3]</dhp-schemas.version>
|
||||
<dhp-schemas.version>[5.17.3]</dhp-schemas.version>
|
||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||
|
|
Loading…
Reference in New Issue