context propagation

This commit is contained in:
Miriam Baglioni 2021-02-08 10:32:13 +01:00
parent 6190465851
commit 7572069f98
18 changed files with 955 additions and 0 deletions

View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-contextpropagation</artifactId>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-graph-provision-scholexplorer</artifactId>
<version>1.2.4-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,77 @@
package eu.dnetlib.dhp.contextpropagation;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Map;
public class Costants implements Serializable {
private static Map<String, PropagationUse> publicationDatasetSemantics = Maps.newHashMap();
static {
publicationDatasetSemantics.put("issupplementedby", PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
publicationDatasetSemantics.put("cites", PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
publicationDatasetSemantics.put("describes", PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
publicationDatasetSemantics.put("references", PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
publicationDatasetSemantics.put("documents", PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
}
private static Map<String, PropagationUse> datasetDatasetSemantics = Maps.newHashMap();
static{
datasetDatasetSemantics.put("isdescribedby",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("iscitedby",PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("cites",PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("issupplementedby",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("issupplementto",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("iscontinuedby",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("continues",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("hasversion",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isversionof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isnewversionof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("ispreviousversionof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("ispartof",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("haspart",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("references",PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isreferencedby",PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("documents",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isdocumentedby",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isvariantformof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isoriginalformof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isidenticalto",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("obsoletes",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isobsoletedby",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
}
public static Map<String, PropagationUse> getPublicationDatasetSemantics() {
return publicationDatasetSemantics;
}
public static Map<String, PropagationUse> getDatasetDatasetSemantics() {
return datasetDatasetSemantics;
}
public static boolean containedInPubSem(String sem){
return publicationDatasetSemantics.containsKey(sem);
}
public static boolean containedInDatsSem(String sem){
return datasetDatasetSemantics.containsKey(sem);
}
public static PropagationUse getPublicationValue(String sem){
return publicationDatasetSemantics.get(sem);
}
public static PropagationUse getDatasetValue(String sem){
return datasetDatasetSemantics.get(sem);
}
}

View File

@ -0,0 +1,19 @@
package eu.dnetlib.dhp.contextpropagation;
import java.io.Serializable;
import java.util.Map;
public class DatasetPropagationStructure implements Serializable {
private Map<String, PropagationUse> propagation;
public Map<String, PropagationUse> getPropagation() {
return propagation;
}
public void add(String key, PropagationUse value){
propagation.put(key, value);
}
}

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.contextpropagation;
import java.io.Serializable;
import java.util.List;
public class Node implements Serializable {
private String id;
private List<String> publisher;
public List<String> getPublisher() {
return publisher;
}
public void setPublisher(List<String> publisher) {
this.publisher = publisher;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public static Node newInstance(String id, List<String> publisher){
Node n = new Node();
n.id = id;
n.publisher = publisher;
return n;
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.contextpropagation
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
object PropagationAggregator {
def getDatasetAggregator(): Aggregator[(String, PropagationStructure), PropagationStructure, PropagationStructure] = new Aggregator[(String, PropagationStructure), PropagationStructure, PropagationStructure]{
override def zero: PropagationStructure = new PropagationStructure()
override def reduce(b: PropagationStructure, a: (String, PropagationStructure)): PropagationStructure = {
b.mergeFrom(a._2)
}
override def merge(wx: PropagationStructure, wy: PropagationStructure): PropagationStructure = {
wx.mergeFrom(wy)
}
override def finish(reduction: PropagationStructure): PropagationStructure = reduction
override def bufferEncoder: Encoder[PropagationStructure] =
Encoders.kryo(classOf[PropagationStructure])
override def outputEncoder: Encoder[PropagationStructure] =
Encoders.kryo(classOf[PropagationStructure])
}
}

View File

@ -0,0 +1,48 @@
package eu.dnetlib.dhp.contextpropagation;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class PropagationStructure implements Serializable {
private Map<String, List<PropagationUse>> propagation;
public Map<String, List<PropagationUse>> getPropagation() {
return propagation;
}
public void add(String key, List<PropagationUse> value){
propagation.put(key, value);
}
public void setPropagation(Map<String, List<PropagationUse>> propagation) {
this.propagation = propagation;
}
private void mergeList(PropagationUse use, List<PropagationUse> acc){
for(PropagationUse pu: acc){
if (use.getUse().equals(pu.getUse())){
pu.getPath().addAll(use.getPath());
if (Integer.valueOf(pu.getWeight()) < Integer.valueOf(use.getWeight())){
pu.setWeight(use.getWeight());
return;
}
}
}
acc.add(use);
}
public PropagationStructure mergeFrom(PropagationStructure ps){
for(String key : ps.propagation.keySet()){
if (propagation.containsKey(key)){
ps.propagation.get(key).forEach( use -> mergeList(use, propagation.get(key)));
}else{
propagation.put(key, ps.propagation.get(key).stream().map(pu -> PropagationUse.copyInstance(pu)).collect(Collectors.toList()));
}
}
return this;
}
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.contextpropagation;
import java.io.Serializable;
import java.util.List;
import java.util.Set;
public class PropagationUse implements Serializable {
private String use;
private String weight;
private Set<String> path;
public String getUse() {
return use;
}
public void setUse(String use) {
this.use = use;
}
public String getWeight() {
return weight;
}
public void setWeight(String weight) {
this.weight = weight;
}
public Set<String> getPath() {
return path;
}
public void setPath(Set<String> path) {
this.path = path;
}
public static PropagationUse newInstance(String use, String weight, Set<String> path){
PropagationUse pu = new PropagationUse();
pu.use = use;
pu.weight = weight;
pu.path = path;
return pu;
}
public static PropagationUse copyInstance(PropagationUse use){
PropagationUse pu = new PropagationUse();
pu.path = use.path;
pu.weight = use.weight;
pu.use = use.use;
return pu;
}
}

View File

@ -0,0 +1,33 @@
package eu.dnetlib.dhp.contextpropagation;
import java.io.Serializable;
public class RelationPropagation implements Serializable {
private Node source;
private Node target;
private String semantics;
public Node getSource() {
return source;
}
public void setSource(Node source) {
this.source = source;
}
public Node getTarget() {
return target;
}
public void setTarget(Node target) {
this.target = target;
}
public String getSemantics() {
return semantics;
}
public void setSemantics(String semantics) {
this.semantics = semantics;
}
}

View File

@ -0,0 +1,127 @@
package eu.dnetlib.dhp.contextpropagation
import java.util
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.provision.SparkConvertDatasetToJson
import eu.dnetlib.dhp.provision.scholix.{Scholix, ScholixEntityId}
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import scala.collection.JavaConverters._
object SparkContextPropagation {
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertDatasetToJson.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/dataset2Json.json")))
parser.parseArgument(args)
val conf = new SparkConf
val spark = SparkSession.builder.config(conf).appName(SparkConvertDatasetToJson.getClass.getSimpleName).master(parser.get("master")).getOrCreate
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val propagationEncoder: Encoder[RelationPropagation] = Encoders.kryo[RelationPropagation]
implicit val mapEncoderPub: Encoder[PropagationStructure] = Encoders.kryo[PropagationStructure]
implicit val mapEncoderDats: Encoder[DatasetPropagationStructure] = Encoders.kryo[DatasetPropagationStructure]
implicit val tupleForPropagation: Encoder[(String, PropagationStructure)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
implicit val tupleForPropagationDars: Encoder[(String, DatasetPropagationStructure)] = Encoders.tuple(Encoders.STRING, mapEncoderDats)
implicit val stringEncoder: Encoder[String] = Encoders.STRING
val workingPath = parser.get("workingPath")
def getPublisherList(item: List[ScholixEntityId]) : util.List[String] =
{
item.map(entry => entry.getName).asJava
}
def propagateDataset (item: ((String, PropagationStructure), (String, DatasetPropagationStructure))) : (String, PropagationStructure) = {
val propagation = item._1._2.getPropagation.asScala
val dsprob : DatasetPropagationStructure= item._2._2
val source = dsprob.getPropagation.keySet().iterator().next()
val dic = new scala.collection.mutable.HashMap[String, util.List[PropagationUse]]
propagation.keysIterator.foreach(key => {
val entries = propagation.get(key).get.asScala
entries.foreach(entry => {
if((entry.getUse == dsprob.getPropagation.get(source).getUse || dsprob.getPropagation.get(source).getUse == "proxy")
&& !entry.getPath.contains(source)) {
var new_p = Integer.valueOf(entry.getWeight) * Integer.valueOf(dsprob.getPropagation.get(source).getWeight)
if (new_p > 0.3){
var newentry : PropagationUse = PropagationUse.copyInstance(entry)
newentry.setWeight(String.valueOf(new_p))
newentry.getPath.add(source)
dic(key).add(newentry)
}
}
})
})
var ps: PropagationStructure = new PropagationStructure
ps.setPropagation(dic.asJava)
(source, ps)
}
spark.read.load(s"$workingPath/summary").as[ScholixSummary]
.map(s => new ObjectMapper().writeValueAsString(s))(Encoders.STRING)
.rdd.repartition(500).saveAsTextFile(s"$workingPath/summary_json", classOf[GzipCodec])
val allowedRelations : Dataset[RelationPropagation] = spark.read.load(s"$workingPath/scholix").as[Scholix]
.filter(s => !s.getSource().getDnetIdentifier().substring(0,2).equals("70") )
.filter(s => !s.getTarget().getDnetIdentifier().substring(0,2).equals("70"))
.map(s => {
val rp = new RelationPropagation
rp.setSource(Node.newInstance(s.getSource.getDnetIdentifier, getPublisherList(s.getSource.getPublisher.asScala.toList)))
rp.setTarget(Node.newInstance(s.getTarget.getDnetIdentifier, getPublisherList(s.getTarget.getPublisher.asScala.toList)))
rp.setSemantics(s.getRelationship.getName)
rp
})
val pubs_rel : Dataset[RelationPropagation] = allowedRelations
.filter(r => r.getSource.getId.substring(0,2) == "50"
&& r.getTarget.getId.substring(0,2) == "60"
&& Costants.containedInPubSem(r.getSemantics))
val dats_rel : Dataset[RelationPropagation] = allowedRelations
.filter(r => r.getSource.getId.substring(0,2) == "60"
&& r.getTarget.getId.substring(0,2) == "60"
&& Costants.containedInDatsSem(r.getSemantics)
&& r.getSource.getId != r.getTarget.getId)
val publication_dataset : Dataset[(String, PropagationStructure)] = pubs_rel.map(r => {
val ps = new PropagationStructure
val pv : List[PropagationUse] = List(PropagationUse.copyInstance(Costants.getPublicationValue(r.getSemantics)))
ps.add(r.getSource.getId, pv.asJava)
(r.getTarget.getId, ps)
})
val dataset_dataset : Dataset[(String, DatasetPropagationStructure)] = dats_rel.map(r => {
val ps = new DatasetPropagationStructure
ps.add(r.getTarget.getId, PropagationUse.copyInstance(Costants.getDatasetValue(r.getSemantics)))
(r.getSource.getId, ps)
})
val pl1 : Dataset[(String, PropagationStructure)] = publication_dataset.groupByKey(_._1)
.agg(PropagationAggregator.getDatasetAggregator().toColumn)
val pl2_step1 : Dataset [(String, PropagationStructure)] = pl1.joinWith(dataset_dataset, pl1("_1").equalTo(dataset_dataset("_1")))
.map(propagateDataset)
val pl2 : Dataset [(String, PropagationStructure)] = pl1.union(pl2_step1).groupByKey(_._1)
.agg(PropagationAggregator.getDatasetAggregator().toColumn)
}
}

View File

@ -0,0 +1,77 @@
package eu.dnetlib.dhp.cantextpropagation;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Map;
public class Costants implements Serializable {
private static Map<String, PropagationUse> publicationDatasetSemantics = Maps.newHashMap();
static {
publicationDatasetSemantics.put("issupplementedby", PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
publicationDatasetSemantics.put("cites", PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
publicationDatasetSemantics.put("describes", PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
publicationDatasetSemantics.put("references", PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
publicationDatasetSemantics.put("documents", PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
}
private static Map<String, PropagationUse> datasetDatasetSemantics = Maps.newHashMap();
static{
datasetDatasetSemantics.put("isdescribedby",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("iscitedby",PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("cites",PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("issupplementedby",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("issupplementto",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("iscontinuedby",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("continues",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("hasversion",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isversionof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isnewversionof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("ispreviousversionof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("ispartof",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("haspart",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("references",PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isreferencedby",PropagationUse.newInstance("reuse", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("documents",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isdocumentedby",PropagationUse.newInstance("latent", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isvariantformof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isoriginalformof",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isidenticalto",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("obsoletes",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
datasetDatasetSemantics.put("isobsoletedby",PropagationUse.newInstance("proxy", "1.0", new HashSet<>()));
}
public static Map<String, PropagationUse> getPublicationDatasetSemantics() {
return publicationDatasetSemantics;
}
public static Map<String, PropagationUse> getDatasetDatasetSemantics() {
return datasetDatasetSemantics;
}
public static boolean containedInPubSem(String sem){
return publicationDatasetSemantics.containsKey(sem);
}
public static boolean containedInDatsSem(String sem){
return datasetDatasetSemantics.containsKey(sem);
}
public static PropagationUse getPublicationValue(String sem){
return publicationDatasetSemantics.get(sem);
}
public static PropagationUse getDatasetValue(String sem){
return datasetDatasetSemantics.get(sem);
}
}

View File

@ -0,0 +1,18 @@
package eu.dnetlib.dhp.cantextpropagation;
import java.io.Serializable;
import java.util.Map;
public class DatasetPropagationStructure implements Serializable {
private Map<String, PropagationUse> propagation;
public Map<String, PropagationUse> getPropagation() {
return propagation;
}
public void add(String key, PropagationUse value){
propagation.put(key, value);
}
}

View File

@ -0,0 +1,33 @@
package eu.dnetlib.dhp.cantextpropagation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class Node implements Serializable {
private String id;
// private List<String> publisher = new ArrayList<>();
//
// public List<String> getPublisher() {
// return publisher;
// }
//
// public void setPublisher(ArrayList<String> publisher) {
// this.publisher = publisher;
// }
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public static Node newInstance(String id){//}, ArrayList<String> publisher){
Node n = new Node();
n.id = id;
// n.publisher = publisher;
return n;
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.cantextpropagation
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
object PropagationAggregator {
def getDatasetAggregator(): Aggregator[(String, PropagationStructure), PropagationStructure, PropagationStructure] = new Aggregator[(String, PropagationStructure), PropagationStructure, PropagationStructure]{
override def zero: PropagationStructure = new PropagationStructure()
override def reduce(b: PropagationStructure, a: (String, PropagationStructure)): PropagationStructure = {
b.mergeFrom(a._2)
}
override def merge(wx: PropagationStructure, wy: PropagationStructure): PropagationStructure = {
wx.mergeFrom(wy)
}
override def finish(reduction: PropagationStructure): PropagationStructure = reduction
override def bufferEncoder: Encoder[PropagationStructure] =
Encoders.kryo(classOf[PropagationStructure])
override def outputEncoder: Encoder[PropagationStructure] =
Encoders.kryo(classOf[PropagationStructure])
}
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.cantextpropagation;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class PropagationStructure implements Serializable {
private Map<String, List<PropagationUse>> propagation = new HashMap<>();
public Map<String, List<PropagationUse>> getPropagation() {
return propagation;
}
public void add(String key, List<PropagationUse> value){
propagation.put(key, value);
}
public void setPropagation(Map<String, List<PropagationUse>> propagation) {
this.propagation = propagation;
}
private void mergeList(PropagationUse use, List<PropagationUse> acc){
for(PropagationUse pu: acc){
if (use.getUse().equals(pu.getUse())){
pu.getPath().addAll(use.getPath());
if (Integer.valueOf(pu.getWeight()) < Integer.valueOf(use.getWeight())){
pu.setWeight(use.getWeight());
return;
}
}
}
acc.add(use);
}
public PropagationStructure mergeFrom(PropagationStructure ps){
for(String key : ps.propagation.keySet()){
if (propagation.containsKey(key)){
ps.propagation.get(key).forEach( use -> mergeList(use, propagation.get(key)));
}else{
propagation.put(key, ps.propagation.get(key).stream().map(pu -> PropagationUse.copyInstance(pu)).collect(Collectors.toList()));
}
}
return this;
}
}

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.cantextpropagation;
import java.io.Serializable;
import java.util.Set;
public class PropagationUse implements Serializable {
private String use;
private String weight;
private Set<String> path;
public String getUse() {
return use;
}
public void setUse(String use) {
this.use = use;
}
public String getWeight() {
return weight;
}
public void setWeight(String weight) {
this.weight = weight;
}
public Set<String> getPath() {
return path;
}
public void setPath(Set<String> path) {
this.path = path;
}
public static PropagationUse newInstance(String use, String weight, Set<String> path){
PropagationUse pu = new PropagationUse();
pu.use = use;
pu.weight = weight;
pu.path = path;
return pu;
}
public static PropagationUse copyInstance(PropagationUse use){
PropagationUse pu = new PropagationUse();
pu.path = use.path;
pu.weight = use.weight;
pu.use = use.use;
return pu;
}
}

View File

@ -0,0 +1,11 @@
package eu.dnetlib.dhp.cantextpropagation;
import java.io.Serializable;
import java.util.ArrayList;
public class Publisher extends ArrayList<String> implements Serializable {
public Publisher(){super();}
}

View File

@ -0,0 +1,34 @@
package eu.dnetlib.dhp.cantextpropagation;
import java.io.Serializable;
public class RelationPropagation implements Serializable {
private Node source;
private Node target;
private String semantics;
public RelationPropagation(){}
public Node getSource() {
return source;
}
public void setSource(Node source) {
this.source = source;
}
public Node getTarget() {
return target;
}
public void setTarget(Node target) {
this.target = target;
}
public String getSemantics() {
return semantics;
}
public void setSemantics(String semantics) {
this.semantics = semantics;
}
}

View File

@ -0,0 +1,127 @@
package eu.dnetlib.dhp.cantextpropagation
import java.util
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.provision.SparkConvertDatasetToJson
import eu.dnetlib.dhp.provision.scholix.{Scholix, ScholixEntityId}
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import scala.collection.JavaConverters._
object SparkContextPropagation {
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertDatasetToJson.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/dataset2Json.json")))
parser.parseArgument(args)
val conf = new SparkConf
val spark = SparkSession.builder.config(conf).appName(SparkConvertDatasetToJson.getClass.getSimpleName).master(parser.get("master")).getOrCreate
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val propagationEncoder: Encoder[RelationPropagation] = Encoders.kryo[RelationPropagation]
implicit val mapEncoderPub: Encoder[PropagationStructure] = Encoders.kryo[PropagationStructure]
implicit val mapEncoderDats: Encoder[DatasetPropagationStructure] = Encoders.kryo[DatasetPropagationStructure]
implicit val tupleForPropagation: Encoder[(String, PropagationStructure)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
implicit val tupleForPropagationDars: Encoder[(String, DatasetPropagationStructure)] = Encoders.tuple(Encoders.STRING, mapEncoderDats)
implicit val stringEncoder: Encoder[String] = Encoders.STRING
val workingPath = parser.get("workingPath")
def getPublisherList(item: List[ScholixEntityId]) : util.ArrayList[String] =
{
new java.util.ArrayList[String](item.map(p=>p.getName).asJava)
}
def propagateDataset (item: ((String, PropagationStructure), (String, DatasetPropagationStructure))) : (String, PropagationStructure) = {
val propagation = item._1._2.getPropagation.asScala
val dsprob : DatasetPropagationStructure= item._2._2
val source = dsprob.getPropagation.keySet().iterator().next()
val dic = new scala.collection.mutable.HashMap[String, util.List[PropagationUse]]
propagation.keysIterator.foreach(key => {
val entries = propagation.get(key).get.asScala
entries.foreach(entry => {
if((entry.getUse == dsprob.getPropagation.get(source).getUse || dsprob.getPropagation.get(source).getUse == "proxy")
&& !entry.getPath.contains(source)) {
var new_p = Integer.valueOf(entry.getWeight) * Integer.valueOf(dsprob.getPropagation.get(source).getWeight)
if (new_p > 0.3){
var newentry : PropagationUse = PropagationUse.copyInstance(entry)
newentry.setWeight(String.valueOf(new_p))
newentry.getPath.add(source)
dic(key).add(newentry)
}
}
})
})
var ps: PropagationStructure = new PropagationStructure
ps.setPropagation(dic.asJava)
(source, ps)
}
spark.read.load(s"$workingPath/summary").as[ScholixSummary]
.map(s => new ObjectMapper().writeValueAsString(s))(Encoders.STRING)
.rdd.repartition(500).saveAsTextFile(s"$workingPath/summary_json", classOf[GzipCodec])
val allowedRelations : Dataset[RelationPropagation] = spark.read.load(s"$workingPath/scholix").as[Scholix]
.filter(s => !s.getSource().getDnetIdentifier().substring(0,2).equals("70") )
.filter(s => !s.getTarget().getDnetIdentifier().substring(0,2).equals("70"))
.map(s => {
val rp = new RelationPropagation
rp.setSource(Node.newInstance(s.getSource.getDnetIdentifier))//, getPublisherList(s.getSource.getPublisher.asScala.toList)))
rp.setTarget(Node.newInstance(s.getTarget.getDnetIdentifier))//, getPublisherList(s.getTarget.getPublisher.asScala.toList)))
rp.setSemantics(s.getRelationship.getName.toLowerCase())
rp
})
val pubs_rel : Dataset[RelationPropagation] = allowedRelations
.filter(r => r.getSource.getId.startsWith("50")
&& r.getTarget.getId.startsWith("60")
&& Costants.containedInPubSem(r.getSemantics.toLowerCase()))
val dats_rel : Dataset[RelationPropagation] = allowedRelations
.filter(r => r.getSource.getId.startsWith("60")
&& r.getTarget.getId.startsWith("60")
&& Costants.containedInDatsSem(r.getSemantics.toLowerCase())
&& r.getSource.getId != r.getTarget.getId)
val publication_dataset : Dataset[(String, PropagationStructure)] = pubs_rel.map(r => {
val ps = new PropagationStructure
val pv : List[PropagationUse] = List(PropagationUse.copyInstance(Costants.getPublicationValue(r.getSemantics)))
ps.add(r.getSource.getId, pv.asJava)
(r.getTarget.getId, ps)
})
val dataset_dataset : Dataset[(String, DatasetPropagationStructure)] = dats_rel.map(r => {
val ps = new DatasetPropagationStructure
ps.add(r.getTarget.getId, PropagationUse.copyInstance(Costants.getDatasetValue(r.getSemantics)))
(r.getSource.getId, ps)
})
val pl1 : Dataset[(String, PropagationStructure)] = publication_dataset.groupByKey(_._1)
.agg(PropagationAggregator.getDatasetAggregator().toColumn)
val pl2_step1 : Dataset [(String, PropagationStructure)] = pl1.joinWith(dataset_dataset, pl1("_1").equalTo(dataset_dataset("_1")))
.map(propagateDataset)
val pl2 : Dataset [(String, PropagationStructure)] = pl1.union(pl2_step1).groupByKey(_._1)
.agg(PropagationAggregator.getDatasetAggregator().toColumn)
}
}