1
0
Fork 0

Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
Michele Artini 2020-03-25 15:56:49 +01:00
commit fd57722c69
54 changed files with 1847 additions and 1011 deletions

View File

@ -31,6 +31,11 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.schema.action;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import java.io.Serializable;
@JsonDeserialize(using = AtomicActionDeserializer.class)
public class AtomicAction<T extends Oaf> implements Serializable {
private Class<T> clazz;
private T payload;
public AtomicAction() {
}
public AtomicAction(Class<T> clazz, T payload) {
this.clazz = clazz;
this.payload = payload;
}
public Class<T> getClazz() {
return clazz;
}
public void setClazz(Class<T> clazz) {
this.clazz = clazz;
}
public T getPayload() {
return payload;
}
public void setPayload(T payload) {
this.payload = payload;
}
}

View File

@ -0,0 +1,29 @@
package eu.dnetlib.dhp.schema.action;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import java.io.IOException;
public class AtomicActionDeserializer extends JsonDeserializer {
@Override
public Object deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
JsonNode node = jp.getCodec().readTree(jp);
String classTag = node.get("clazz").asText();
JsonNode payload = node.get("payload");
ObjectMapper mapper = new ObjectMapper();
try {
final Class<?> clazz = Class.forName(classTag);
return new AtomicAction(clazz, (Oaf) mapper.readValue(payload.toString(), clazz));
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
}

View File

@ -2,10 +2,11 @@ package eu.dnetlib.dhp.schema.oaf;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
import static com.google.common.base.Preconditions.checkArgument;
public class Relation extends Oaf {
@ -70,14 +71,34 @@ public class Relation extends Oaf {
}
public void mergeFrom(final Relation r) {
Assert.assertEquals("source ids must be equal", getSource(), r.getSource());
Assert.assertEquals("target ids must be equal", getTarget(), r.getTarget());
Assert.assertEquals("relType(s) must be equal", getRelType(), r.getRelType());
Assert.assertEquals("subRelType(s) must be equal", getSubRelType(), r.getSubRelType());
Assert.assertEquals("relClass(es) must be equal", getRelClass(), r.getRelClass());
checkArgument(Objects.equals(getSource(), r.getSource()),"source ids must be equal");
checkArgument(Objects.equals(getTarget(), r.getTarget()),"target ids must be equal");
checkArgument(Objects.equals(getRelType(), r.getRelType()),"relType(s) must be equal");
checkArgument(Objects.equals(getSubRelType(), r.getSubRelType()),"subRelType(s) must be equal");
checkArgument(Objects.equals(getRelClass(), r.getRelClass()),"relClass(es) must be equal");
setCollectedFrom(Stream.concat(getCollectedFrom().stream(), r.getCollectedFrom().stream())
.distinct() // relies on KeyValue.equals
.collect(Collectors.toList()));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Relation relation = (Relation) o;
return relType.equals(relation.relType) &&
subRelType.equals(relation.subRelType) &&
relClass.equals(relation.relClass) &&
source.equals(relation.source) &&
target.equals(relation.target) &&
Objects.equals(collectedFrom, relation.collectedFrom);
}
@Override
public int hashCode() {
return Objects.hash(relType, subRelType, relClass, source, target, collectedFrom);
}
}

View File

@ -0,0 +1,37 @@
package eu.dnetlib.dhp.schema.action;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class AtomicActionTest {
@Test
public void serializationTest() throws IOException {
Relation rel = new Relation();
rel.setSource("1");
rel.setTarget("2");
rel.setRelType("resultResult");
rel.setSubRelType("dedup");
rel.setRelClass("merges");
AtomicAction aa1 = new AtomicAction(Relation.class, rel);
final ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(aa1);
Assert.assertTrue(StringUtils.isNotBlank(json));
AtomicAction aa2 = mapper.readValue(json, AtomicAction.class);
Assert.assertEquals(aa1.getClazz(), aa2.getClazz());
Assert.assertEquals(aa1.getPayload(), aa2.getPayload());
}
}

View File

@ -5,17 +5,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.actionmanager.actions.AtomicAction;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -84,24 +81,29 @@ public class TransformActions implements Serializable {
log.info(String.format("transforming actions from '%s' to '%s'", sourcePath, targetDirectory));
sc.sequenceFile(sourcePath, Text.class, Text.class)
.mapToPair(a -> new Tuple2<>(a._1(), AtomicAction.fromJSON(a._2().toString())))
.mapToPair(a -> new Tuple2<>(a._1(), eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString())))
.mapToPair(a -> new Tuple2<>(a._1(), transformAction(a._1().toString(), a._2())))
.filter(t -> StringUtils.isNotBlank(t._2().toString()))
.saveAsHadoopFile(targetDirectory.toString(), Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
}
}
private Text transformAction(String atomicaActionId, AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException {
private Text transformAction(String atomicaActionId, eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException {
final Text out = new Text();
final ObjectMapper mapper = new ObjectMapper();
if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) {
Oaf oaf = ProtoConverter.convert(OafProtos.Oaf.parseFrom(aa.getTargetValue()));
aa.setTargetValue(mapper.writeValueAsString(oaf).getBytes());
out.set(mapper.writeValueAsString(doTransform(aa)));
} else {
if (atomicaActionId.contains("dedupSimilarity")) {
out.set(mapper.writeValueAsString(getRelationAtomicAction(atomicaActionId)));
}
}
return out;
}
private AtomicAction<Relation> getRelationAtomicAction(String atomicaActionId) {
final String[] splitId = atomicaActionId.split("@");
String source = splitId[0];
@ -132,11 +134,44 @@ public class TransformActions implements Serializable {
rel.setDataInfo(d);
aa.setTargetValue(mapper.writeValueAsString(rel).getBytes());
}
return new AtomicAction<>(Relation.class, rel);
}
return new Text(mapper.writeValueAsString(aa));
private AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException {
final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue());
final Oaf oaf = ProtoConverter.convert(proto_oaf);
switch (proto_oaf.getKind()) {
case entity:
switch (proto_oaf.getEntity().getType()) {
case datasource:
return new AtomicAction<>(Datasource.class, (Datasource) oaf);
case organization:
return new AtomicAction<>(Organization.class, (Organization) oaf);
case project:
return new AtomicAction<>(Project.class, (Project) oaf);
case result:
final String resulttypeid = proto_oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
switch (resulttypeid) {
case "publication":
return new AtomicAction<>(Publication.class, (Publication) oaf);
case "software":
return new AtomicAction<>(Software.class, (Software) oaf);
case "other":
return new AtomicAction<>(OtherResearchProduct.class, (OtherResearchProduct) oaf);
case "dataset":
return new AtomicAction<>(Dataset.class, (Dataset) oaf);
default:
// can be an update, where the resulttype is not specified
return new AtomicAction<>(Result.class, (Result) oaf);
}
default:
throw new IllegalArgumentException("invalid entity type: " + proto_oaf.getEntity().getType());
}
case relation:
return new AtomicAction<>(Relation.class, (Relation) oaf);
default:
throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind());
}
}
private String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {

View File

@ -15,6 +15,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@ -87,7 +88,7 @@ public class GenerateEntitiesApplication {
.map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf)));
}
inputRdd.saveAsTextFile(targetPath);
inputRdd.saveAsTextFile(targetPath, GzipCodec.class);
}

View File

@ -4,6 +4,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@ -60,7 +61,7 @@ public class DispatchEntitiesApplication {
sc.textFile(sourcePath)
.filter(l -> isEntityType(l, type))
.map(l -> StringUtils.substringAfter(l, "|"))
.saveAsTextFile(targetPath + "/" + type); // use repartition(XXX) ???
.saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
}
private static boolean isEntityType(final String line, final String type) {

View File

@ -54,12 +54,25 @@
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to='migrate_actionsets' />
<action name='migrate_actionsets'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.actions.MigrateActionSet</main-class>
<java-opt>-Dmapred.task.timeout=${distcp_task_timeout}</java-opt>
<arg>-is</arg><arg>${isLookupUrl}</arg>
@ -78,8 +91,6 @@
<action name="transform_actions">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>transform_actions</name>

View File

@ -1,4 +1,5 @@
<workflow-app name="import regular entities as Graph (all steps)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
@ -48,6 +49,21 @@
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="ReuseContent"/>
<kill name="Kill">
@ -64,8 +80,8 @@
<action name="ResetWorkingPath">
<fs>
<delete path="'${workingPath}'"/>
<mkdir path="'${workingPath}'"/>
<delete path="${workingPath}"/>
<mkdir path="${workingPath}"/>
</fs>
<ok to="ImportDB"/>
<error to="Kill"/>
@ -73,8 +89,6 @@
<action name="ImportDB">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${workingPath}/db_records</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
@ -87,8 +101,6 @@
<action name="ImportODF">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingPath}/odf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
@ -103,8 +115,6 @@
<action name="ImportOAF">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingPath}/oaf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
@ -119,7 +129,7 @@
<action name="ResetAllEntitiesPath">
<fs>
<delete path="'${workingPath}/all_entities'"/>
<delete path="${workingPath}/all_entities"/>
</fs>
<ok to="GenerateEntities"/>
<error to="Kill"/>
@ -127,9 +137,7 @@
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
@ -155,8 +163,8 @@
<action name="ResetGraphPath">
<fs>
<delete path="'${graphBasePath}/graph_raw'"/>
<mkdir path="'${graphBasePath}/graph_raw'"/>
<delete path="${graphBasePath}/graph_raw"/>
<mkdir path="${graphBasePath}/graph_raw"/>
</fs>
<ok to="GenerateGraph"/>
<error to="Kill"/>
@ -164,9 +172,7 @@
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>

View File

@ -65,6 +65,15 @@
<groupId>com.arakelian</groupId>
<artifactId>java-jq</artifactId>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
@ -83,8 +92,6 @@
<artifactId>jackson-core</artifactId>
</dependency>
</dependencies>

View File

@ -1,79 +0,0 @@
package eu.dnetlib.dedup;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import eu.dnetlib.dedup.graph.ConnectedComponent;
import eu.dnetlib.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class SparkCreateConnectedComponent {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkCreateConnectedComponent.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String entity = parser.get("entity");
final String targetPath = parser.get("targetPath");
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
final JavaPairRDD<Object, String> vertexes = sc.textFile(inputPath + "/" + entity)
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair((PairFunction<String, Object, String>)
s -> new Tuple2<Object, String>(getHashcode(s), s)
);
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class));
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
c.getDocIds()
.stream()
.flatMap(id -> {
List<Relation> tmp = new ArrayList<>();
Relation r = new Relation();
r.setSource(c.getCcId());
r.setTarget(id);
r.setRelClass("merges");
tmp.add(r);
r = new Relation();
r.setTarget(c.getCcId());
r.setSource(id);
r.setRelClass("isMergedIn");
tmp.add(r);
return tmp.stream();
}).iterator()).rdd(), Encoders.bean(Relation.class));
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,entity));
}
public static long getHashcode(final String id) {
return Hashing.murmur3_128().hashUnencodedChars(id).asLong();
}
}

View File

@ -1,39 +0,0 @@
package eu.dnetlib.dedup;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
public class SparkCreateDedupRecord {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedupRecord_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkCreateDedupRecord.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String sourcePath = parser.get("sourcePath");
final String entity = parser.get("entity");
final String dedupPath = parser.get("dedupPath");
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
final JavaRDD<OafEntity> dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(dedupPath,entity), DedupUtility.createEntityPath(sourcePath,entity), OafEntityType.valueOf(entity), dedupConf);
dedupRecord.map(r-> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(r);
}).saveAsTextFile(dedupPath+"/"+entity+"_dedup_record_json");
}
}

View File

@ -1,73 +0,0 @@
package eu.dnetlib.dedup;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.List;
/**
* This Spark class creates similarity relations between entities, saving result
*
* param request:
* sourcePath
* entityType
* target Path
*/
public class SparkCreateSimRels {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkCreateSimRels.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String entity = parser.get("entity");
final String targetPath = parser.get("targetPath");
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
final long total = sc.textFile(inputPath + "/" + entity).count();
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(inputPath + "/" + entity)
.mapToPair(s->{
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf,s);
return new Tuple2<>(d.getIdentifier(), d);});
//create blocks for deduplication
JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
// JavaPairRDD<String, Iterable<MapDocument>> blocks = Deduper.createBlocks(sc, mapDocument, dedupConf);
//create relations by comparing only elements in the same group
final JavaPairRDD<String,String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
// final JavaPairRDD<String,String> dedupRels = Deduper.computeRelations(sc, blocks, dedupConf);
final JavaRDD<Relation> isSimilarToRDD = dedupRels.map(simRel -> {
final Relation r = new Relation();
r.setSource(simRel._1());
r.setTarget(simRel._2());
r.setRelClass("isSimilarTo");
return r;
});
spark.createDataset(isSimilarToRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(targetPath,entity));
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dedup;
package eu.dnetlib.dhp.dedup;
import eu.dnetlib.dhp.schema.oaf.Field;
import org.apache.commons.lang.StringUtils;

View File

@ -1,11 +1,9 @@
package eu.dnetlib.dedup;
package eu.dnetlib.dhp.dedup;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@ -16,9 +14,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import scala.Tuple2;
import java.util.Collection;
import java.util.Random;
import static java.util.stream.Collectors.toMap;
public class DedupRecordFactory {

View File

@ -1,35 +1,32 @@
package eu.dnetlib.dedup;
package eu.dnetlib.dhp.dedup;
import com.google.common.collect.Sets;
import com.wcohen.ss.JaroWinkler;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.Person;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import scala.Tuple2;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.io.StringReader;
import java.security.MessageDigest;
import java.text.Normalizer;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DedupUtility {
private static final Double THRESHOLD = 0.95;
@ -54,38 +51,6 @@ public class DedupUtility {
return accumulators;
}
public static JavaRDD<String> loadDataFromHDFS(String path, JavaSparkContext context) {
return context.textFile(path);
}
public static void deleteIfExists(String path) throws IOException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(new Path(path))) {
fileSystem.delete(new Path(path), true);
}
}
public static DedupConfig loadConfigFromHDFS(String path) throws IOException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(new Path(path)));
return DedupConfig.load(IOUtils.toString(inputStream, StandardCharsets.UTF_8.name()));
}
static <T> String readFromClasspath(final String filename, final Class<T> clazz) {
final StringWriter sw = new StringWriter();
try {
IOUtils.copy(clazz.getResourceAsStream(filename), sw);
return sw.toString();
} catch (final IOException e) {
throw new RuntimeException("cannot load resource from classpath: " + filename);
}
}
static Set<String> getGroupingKeys(DedupConfig conf, MapDocument doc) {
return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf));
}
@ -146,16 +111,20 @@ public class DedupUtility {
});
}
public static String createDedupRecordPath(final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_deduprecord", basePath, actionSetId, entityType);
}
public static String createEntityPath(final String basePath, final String entityType) {
return String.format("%s/%s", basePath, entityType);
}
public static String createSimRelPath(final String basePath, final String entityType) {
return String.format("%s/%s_simRel", basePath, entityType);
public static String createSimRelPath(final String basePath, final String actionSetId,final String entityType) {
return String.format("%s/%s/%s_simrel", basePath, actionSetId, entityType);
}
public static String createMergeRelPath(final String basePath, final String entityType) {
return String.format("%s/%s_mergeRel", basePath, entityType);
public static String createMergeRelPath(final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
}
private static Double sim(Author a, Author b) {
@ -216,4 +185,37 @@ public class DedupUtility {
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator);
String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery);
final Document doc = new SAXReader().read(new StringReader(orchestratorProfile));
final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id");
final List<DedupConfig> configurations = new ArrayList<>();
for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) {
configurations.add(loadConfig(isLookUpService, actionSetId, o));
}
return configurations;
}
private static DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o)
throws ISLookUpException {
final Element s = (Element) o;
final String configProfileId = s.attributeValue("id");
final String conf =
isLookUpService.getResourceProfileByQuery(String.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
configProfileId));
final DedupConfig dedupConfig = DedupConfig.load(conf);
dedupConfig.getWf().setConfigurationId(actionSetId);
return dedupConfig;
}
}

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dedup;
package eu.dnetlib.dhp.dedup;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.BlockProcessor;
import eu.dnetlib.pace.util.MapDocumentUtil;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dedup;
package eu.dnetlib.dhp.dedup;
public enum OafEntityType {

View File

@ -0,0 +1,100 @@
package eu.dnetlib.dhp.dedup;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.dedup.graph.ConnectedComponent;
import eu.dnetlib.dhp.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class SparkCreateConnectedComponent {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createCC_parameters.json")));
parser.parseArgument(args);
new SparkCreateConnectedComponent().run(parser);
}
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue();
final JavaPairRDD<Object, String> vertexes = sc.textFile(graphBasePath + "/" + subEntity)
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair((PairFunction<String, Object, String>)
s -> new Tuple2<Object, String>(getHashcode(s), s)
);
final Dataset<Relation> similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class));
final RDD<Edge<String>> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd();
final JavaRDD<ConnectedComponent> cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
final Dataset<Relation> mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction<ConnectedComponent, Relation>) c ->
c.getDocIds()
.stream()
.flatMap(id -> {
List<Relation> tmp = new ArrayList<>();
Relation r = new Relation();
r.setSource(c.getCcId());
r.setTarget(id);
r.setRelClass("merges");
tmp.add(r);
r = new Relation();
r.setTarget(c.getCcId());
r.setSource(id);
r.setRelClass("isMergedIn");
tmp.add(r);
return tmp.stream();
}).iterator()).rdd(), Encoders.bean(Relation.class));
mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(workingPath, actionSetId, entity));
}
}
}
public static long getHashcode(final String id) {
return Hashing.murmur3_128().hashString(id).asLong();
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
SparkConf conf = new SparkConf();
return SparkSession
.builder()
.appName(SparkCreateSimRels.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}

View File

@ -0,0 +1,62 @@
package eu.dnetlib.dhp.dedup;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
public class SparkCreateDedupRecord {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createDedupRecord_parameters.json")));
parser.parseArgument(args);
new SparkCreateDedupRecord().run(parser);
}
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
String subEntity = dedupConf.getWf().getSubEntityValue();
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
final OafEntityType entityType = OafEntityType.valueOf(subEntity);
final JavaRDD<OafEntity> dedupRecord =
DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf);
dedupRecord.map(r -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(r);
}).saveAsTextFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity));
}
}
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
SparkConf conf = new SparkConf();
return SparkSession
.builder()
.appName(SparkCreateDedupRecord.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}

View File

@ -0,0 +1,134 @@
package eu.dnetlib.dhp.dedup;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import scala.Tuple2;
import java.io.Serializable;
import java.util.List;
public class SparkCreateSimRels implements Serializable {
private static final Log log = LogFactory.getLog(SparkCreateSimRels.class);
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json")));
parser.parseArgument(args);
new SparkCreateSimRels().run(parser);
}
private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
//read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
System.out.println(String.format("graphBasePath: '%s'", graphBasePath));
System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl));
System.out.println(String.format("actionSetId: '%s'", actionSetId));
System.out.println(String.format("workingPath: '%s'", workingPath));
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//for each dedup configuration
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue();
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.mapToPair(s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d);
});
//create blocks for deduplication
JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
//create relations by comparing only elements in the same group
final JavaPairRDD<String, String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
JavaRDD<Relation> relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity));
//save the simrel in the workingdir
spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class))
.write()
.mode("overwrite")
.save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
}
}
}
/**
* Utility method used to create an atomic action from a Relation object
* @param relation input relation
* @return A tuple2 with [id, json serialization of the atomic action]
* @throws JsonProcessingException
*/
public Tuple2<Text, Text> createSequenceFileRow(Relation relation) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget();
AtomicAction<Relation> aa = new AtomicAction<>(Relation.class, relation);
return new Tuple2<>(
new Text(id),
new Text(mapper.writeValueAsString(aa))
);
}
public Relation createSimRel(String source, String target, String entity){
final Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
switch(entity){
case "result":
r.setRelClass("resultResult_dedupSimilarity_isSimilarTo");
break;
case "organization":
r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo");
break;
default:
r.setRelClass("isSimilarTo");
break;
}
return r;
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
SparkConf conf = new SparkConf();
return SparkSession
.builder()
.appName(SparkCreateSimRels.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.getOrCreate();
}
}

View File

@ -0,0 +1,169 @@
package eu.dnetlib.dhp.dedup;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.IOException;
public class SparkPropagateRelation {
enum FieldType {
SOURCE,
TARGET
}
final static String SOURCEJSONPATH = "$.source";
final static String TARGETJSONPATH = "$.target";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/propagateRelation_parameters.json")));
parser.parseArgument(args);
new SparkPropagateRelation().run(parser);
}
public void run(ArgumentApplicationParser parser) {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String dedupGraphPath = parser.get("dedupGraphPath");
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final Dataset<Relation> mergeRels = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", "*")).as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = mergeRels
.where("relClass == 'merges'")
.select(mergeRels.col("source"), mergeRels.col("target"))
.distinct()
.toJavaRDD()
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(1), r.getString(0)));
JavaRDD<String> relations = sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation"));
JavaRDD<String> newRels = relations.mapToPair(
(PairFunction<String, String, String>) s ->
new Tuple2<>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s))
.leftOuterJoin(mergedIds)
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
if (v1._2()._2().isPresent()) {
return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.SOURCE);
}
return v1._2()._1();
})
.mapToPair(
(PairFunction<String, String, String>) s ->
new Tuple2<>(MapDocumentUtil.getJPathString(TARGETJSONPATH, s), s))
.leftOuterJoin(mergedIds)
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
if (v1._2()._2().isPresent()) {
return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.TARGET);
}
return v1._2()._1();
}).filter(SparkPropagateRelation::containsDedup)
.repartition(500);
//update deleted by inference
relations = relations.mapToPair(
(PairFunction<String, String, String>) s ->
new Tuple2<>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s))
.leftOuterJoin(mergedIds)
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
if (v1._2()._2().isPresent()) {
return updateDeletedByInference(v1._2()._1(), Relation.class);
}
return v1._2()._1();
})
.mapToPair(
(PairFunction<String, String, String>) s ->
new Tuple2<>(MapDocumentUtil.getJPathString(TARGETJSONPATH, s), s))
.leftOuterJoin(mergedIds)
.map((Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>) v1 -> {
if (v1._2()._2().isPresent()) {
return updateDeletedByInference(v1._2()._1(), Relation.class);
}
return v1._2()._1();
})
.repartition(500);
newRels.union(relations).repartition(1000)
.saveAsTextFile(DedupUtility.createEntityPath(dedupGraphPath, "relation"), GzipCodec.class);
}
}
private static boolean containsDedup(final String json) {
final String source = MapDocumentUtil.getJPathString(SOURCEJSONPATH, json);
final String target = MapDocumentUtil.getJPathString(TARGETJSONPATH, json);
return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup");
}
private static String replaceField(final String json, final String id, final FieldType type) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
Relation relation = mapper.readValue(json, Relation.class);
if (relation.getDataInfo() == null)
relation.setDataInfo(new DataInfo());
relation.getDataInfo().setDeletedbyinference(false);
switch (type) {
case SOURCE:
relation.setSource(id);
return mapper.writeValueAsString(relation);
case TARGET:
relation.setTarget(id);
return mapper.writeValueAsString(relation);
default:
throw new IllegalArgumentException("");
}
} catch (IOException e) {
throw new RuntimeException("unable to deserialize json relation: " + json, e);
}
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
SparkConf conf = new SparkConf();
return SparkSession
.builder()
.appName(SparkPropagateRelation.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
private static <T extends Oaf> String updateDeletedByInference(final String json, final Class<T> clazz) {
final ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
Oaf entity = mapper.readValue(json, clazz);
if (entity.getDataInfo()== null)
entity.setDataInfo(new DataInfo());
entity.getDataInfo().setDeletedbyinference(true);
return mapper.writeValueAsString(entity);
} catch (IOException e) {
throw new RuntimeException("Unable to convert json", e);
}
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dedup;
package eu.dnetlib.dhp.dedup;
import eu.dnetlib.pace.util.Reporter;
import org.apache.commons.logging.Log;

View File

@ -0,0 +1,141 @@
package eu.dnetlib.dhp.dedup;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
public class SparkUpdateEntity implements Serializable {
final String IDJSONPATH = "$.id";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/updateEntity_parameters.json")));
parser.parseArgument(args);
new SparkUpdateEntity().run(parser);
}
public boolean mergeRelExists(String basePath, String entity) throws IOException {
boolean result = false;
FileSystem fileSystem = FileSystem.get(new Configuration());
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath));
for (FileStatus fs : fileStatuses) {
if (fs.isDirectory())
if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity))))
result = true;
}
return result;
}
public void run(ArgumentApplicationParser parser) throws IOException {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String dedupGraphPath = parser.get("dedupGraphPath");
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//for each entity
for (OafEntityType entity: OafEntityType.values()) {
JavaRDD<String> sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, entity.toString()));
if (mergeRelExists(workingPath, entity.toString())) {
final Dataset<Relation> rel = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", entity.toString())).as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = rel
.where("relClass == 'merges'")
.select(rel.col("target"))
.distinct()
.toJavaRDD()
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
final JavaRDD<String> dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString()));
JavaPairRDD<String, String> entitiesWithId = sourceEntity.mapToPair((PairFunction<String, String, String>) s -> new Tuple2<>(MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
JavaRDD<String> map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1());
sourceEntity = map.union(dedupEntity);
}
sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class);
}
}
}
public Class<? extends Oaf> getOafClass(OafEntityType className) {
switch (className.toString()) {
case "publication":
return Publication.class;
case "dataset":
return eu.dnetlib.dhp.schema.oaf.Dataset.class;
case "datasource":
return Datasource.class;
case "software":
return Software.class;
case "organization":
return Organization.class;
case "otherresearchproduct":
return OtherResearchProduct.class;
case "project":
return Project.class;
default:
throw new IllegalArgumentException("Illegal type " + className);
}
}
private static <T extends Oaf> String updateDeletedByInference(final String json, final Class<T> clazz) {
final ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
Oaf entity = mapper.readValue(json, clazz);
if (entity.getDataInfo()== null)
entity.setDataInfo(new DataInfo());
entity.getDataInfo().setDeletedbyinference(true);
return mapper.writeValueAsString(entity);
} catch (IOException e) {
throw new RuntimeException("Unable to convert json", e);
}
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
SparkConf conf = new SparkConf();
return SparkSession
.builder()
.appName(SparkUpdateEntity.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dedup.graph;
package eu.dnetlib.dhp.dedup.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dedup.DedupUtility;
import eu.dnetlib.dhp.dedup.DedupUtility;
import eu.dnetlib.pace.util.PaceException;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.annotate.JsonIgnore;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dedup.graph
package eu.dnetlib.dhp.dedup.graph
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

View File

@ -15,12 +15,4 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hive_db_name</name>
<value>openaire</value>
</property>
</configuration>

View File

@ -0,0 +1,105 @@
<workflow-app name="Update Graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>workingPath</name>
<description>path of the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path of the dedup graph</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="UpdateEntity"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="UpdateEntity">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${dedupGraphPath}'/>
</prepare>
<master>yarn</master>
<mode>cluster</mode>
<name>Update Entity</name>
<class>eu.dnetlib.dhp.dedup.SparkUpdateEntity</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg>
</spark>
<ok to="PropagateRelation"/>
<error to="Kill"/>
</action>
<action name="PropagateRelation">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${dedupGraphPath}/relation'/>
</prepare>
<master>yarn</master>
<mode>cluster</mode>
<name>Update Relations</name>
<class>eu.dnetlib.dhp.dedup.SparkPropagateRelation</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "the url for the lookup service",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path for the working directory",
"paramRequired": true
}
]

View File

@ -0,0 +1,32 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "the working directory path",
"paramRequired": true
},
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "the url of the lookup service",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "the id of the actionset (orchestrator)",
"paramRequired": true
}
]

View File

@ -0,0 +1,32 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "address for the LookUp",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
}
]

View File

@ -1,33 +0,0 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the path of the sequential file to read",
"paramRequired": true
},
{
"paramName": "e",
"paramLongName": "entity",
"paramDescription": "the type of entity to be deduped",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "dedupConf",
"paramDescription": "dedup configuration to be used",
"compressed": true,
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetPath",
"paramDescription": "target path to save dedup result",
"paramRequired": true
}
]

View File

@ -1,126 +0,0 @@
<workflow-app name="Dedup Entities" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>entity</name>
<description>the entity that should be processed</description>
</property>
<property>
<name>dedupConf</name>
<description>the dedup Configuration</description>
</property>
<property>
<name>targetPath</name>
<description>the target path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="CreateSimRels"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<!-- <action name="DeleteTargetPath">-->
<!-- <fs>-->
<!-- <delete path='${targetPath}/${entity}_simrel'/>-->
<!-- <delete path='${targetPath}/${entity}_mergeRels'/>-->
<!-- </fs>-->
<!-- <ok to="CreateSimRels"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="CreateSimRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --conf
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--entity</arg><arg>${entity}</arg>
<arg>--dedupConf</arg><arg>${dedupConf}</arg>
</spark>
<ok to="CreateConnectedComponents"/>
<error to="Kill"/>
</action>
<action name="CreateConnectedComponents">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Connected Components</name>
<class>eu.dnetlib.dedup.SparkCreateConnectedComponent</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --conf
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--entity</arg><arg>${entity}</arg>
<arg>--dedupConf</arg><arg>${dedupConf}</arg>
</spark>
<ok to="CreateDedupRecord"/>
<error to="Kill"/>
</action>
<action name="CreateDedupRecord">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Dedup Record</name>
<class>eu.dnetlib.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --conf
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--dedupPath</arg><arg>${dedupPath}</arg>
<arg>--entity</arg><arg>${entity}</arg>
<arg>--dedupConf</arg><arg>${dedupConf}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,26 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "the working directory path",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "dedupGraphPath",
"paramDescription": "the path of the dedup graph",
"paramRequired": true
}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,138 @@
<workflow-app name="Duplicate Scan" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="CreateSimRel"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path="${workingPath}/${actionSetId}/*_simrel"/>
</prepare>
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
</spark-opts>
<arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
</spark>
<ok to="CreateMergeRel"/>
<error to="Kill"/>
</action>
<action name="CreateMergeRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${workingPath}/${actionSetId}/*_mergerel'/>
</prepare>
<master>yarn</master>
<mode>cluster</mode>
<name>Create Merge Relations</name>
<class>eu.dnetlib.dhp.dedup.SparkCreateConnectedComponent</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
</spark>
<ok to="CreateDedupRecord"/>
<error to="Kill"/>
</action>
<action name="CreateDedupRecord">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${workingPath}/${actionSetId}/*_deduprecord'/>
</prepare>
<master>yarn</master>
<mode>cluster</mode>
<name>Create Dedup Record</name>
<class>eu.dnetlib.dhp.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,26 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "the working directory path",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "dedupGraphPath",
"paramDescription": "the path of the dedup graph",
"paramRequired": true
}
]

View File

@ -1,31 +0,0 @@
package eu.dnetlib.dedup.jpath;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class JsonPathTest {
@Test
public void testJPath () throws Exception {
final String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/sample.json"));
List<Map<String, Object>> pid = JsonPath.read(json, "$.pid[*]");
// System.out.println(json);
pid.forEach(it -> {
try {
System.out.println(new ObjectMapper().writeValueAsString(it));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dedup;
package eu.dnetlib.dhp.dedup;
import eu.dnetlib.dhp.schema.oaf.Publication;
import org.apache.commons.io.IOUtils;
@ -13,12 +13,12 @@ import java.util.stream.Collectors;
public class MergeAuthorTest {
List<Publication> publicationsToMerge;
final ObjectMapper mapper = new ObjectMapper();
private List<Publication> publicationsToMerge;
private final ObjectMapper mapper = new ObjectMapper();
@Before
public void setUp() throws Exception {
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dedup/json/authors_merge.json"));
final String json = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/json/authors_merge.json"));
publicationsToMerge = Arrays.asList(json.split("\n")).stream().map(s-> {
@ -28,34 +28,18 @@ public class MergeAuthorTest {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}
@Test
public void test() throws Exception {
Publication dedup = new Publication();
publicationsToMerge.forEach(p-> {
dedup.mergeFrom(p);
dedup.setAuthor(DedupUtility.mergeAuthor(dedup.getAuthor(),p.getAuthor()));
});
System.out.println(mapper.writeValueAsString(dedup));
}
}

View File

@ -1,19 +1,16 @@
package eu.dnetlib.dedup;
package eu.dnetlib.dhp.dedup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Publication;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class SparkCreateDedupTest {
@ -22,19 +19,20 @@ public class SparkCreateDedupTest {
@Before
public void setUp() throws IOException {
configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json"));
// configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json"));
configuration = "";
}
@Test
@Ignore
public void createSimRelsTest() throws Exception {
SparkCreateSimRels.main(new String[] {
SparkCreateSimRels.main(new String[]{
"-mt", "local[*]",
"-s", "/Users/miconis/dumps",
"-e", entity,
"-c", ArgumentApplicationParser.compressArgument(configuration),
"-t", "/tmp/dedup",
"-i", "/Users/miconis/dumps",
"-o", "/tmp/dedup/rawset_test",
"-asi", "dedup-similarity-result-levenstein",
"-la", "lookupurl",
"-w", "workingPath"
});
}
@ -42,7 +40,7 @@ public class SparkCreateDedupTest {
@Ignore
public void createCCTest() throws Exception {
SparkCreateConnectedComponent.main(new String[] {
SparkCreateConnectedComponent.main(new String[]{
"-mt", "local[*]",
"-s", "/Users/miconis/dumps",
"-e", entity,
@ -54,7 +52,7 @@ public class SparkCreateDedupTest {
@Test
@Ignore
public void dedupRecordTest() throws Exception {
SparkCreateDedupRecord.main(new String[] {
SparkCreateDedupRecord.main(new String[]{
"-mt", "local[*]",
"-s", "/Users/miconis/dumps",
"-e", entity,
@ -64,23 +62,22 @@ public class SparkCreateDedupTest {
}
@Test
@Ignore
public void printConfiguration() throws Exception {
System.out.println(ArgumentApplicationParser.compressArgument(configuration));
}
@Test
@Ignore
public void testHashCode() {
final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f";
final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46";
final HashFunction hashFunction = Hashing.murmur3_128();
System.out.println( s1.hashCode());
System.out.println(hashFunction.hashUnencodedChars(s1).asLong());
System.out.println( s2.hashCode());
System.out.println(hashFunction.hashUnencodedChars(s2).asLong());
System.out.println(s1.hashCode());
System.out.println(hashFunction.hashString(s1).asLong());
System.out.println(s2.hashCode());
System.out.println(hashFunction.hashString(s2).asLong());
}
}

File diff suppressed because one or more lines are too long

View File

@ -1,280 +0,0 @@
{
"wf" : {
"threshold" : "0.99",
"dedupRun" : "001",
"entityType" : "result",
"subEntityType" : "resulttype",
"subEntityValue" : "publication",
"orderField" : "title",
"queueMaxSize" : "2000",
"groupMaxSize" : "100",
"maxChildren" : "100",
"idPath": "$.id",
"slidingWindowSize" : "200",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_isAffiliatedWith", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true"
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"strictConditions" : [
{ "name" : "pidMatch", "fields" : [ "pid" ] }
],
"conditions" : [
{ "name" : "titleVersionMatch", "fields" : [ "title" ] },
{ "name" : "sizeMatch", "fields" : [ "authors" ] }
],
"model" : [
{ "name" : "doi", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "true", "path" : "$.pid[?(@.qualifier.classid ==\"doi\")].value" },
{ "name" : "pid", "algo" : "Null", "type" : "JSON", "weight" : "0.0", "ignoreMissing" : "true", "path" : "$.pid", "overrideMatch" : "true" },
{ "name" : "title", "algo" : "LevensteinTitle", "type" : "String", "weight" : "1.0", "ignoreMissing" : "false", "path" : "$.title[?(@.qualifier.classid ==\"main title\")].value", "length" : 250, "size" : 5 },
{ "name" : "authors", "algo" : "Null", "type" : "List", "weight" : "0.0", "ignoreMissing" : "true", "path" : "$.author[*].fullname", "size" : 200 },
{ "name" : "resulttype", "algo" : "Null", "type" : "String", "weight" : "0.0", "ignoreMissing" : "false", "path" : "$.resulttype.classid" }
],
"synonyms": {},
"blacklists" : {
"title" : [
"^Inside Front Cover$",
"(?i)^Poster presentations$",
"^THE ASSOCIATION AND THE GENERAL MEDICAL COUNCIL$",
"^Problems with perinatal pathology\\.?$",
"(?i)^Cases? of Puerperal Convulsions$",
"(?i)^Operative Gyna?ecology$",
"(?i)^Mind the gap\\!?\\:?$",
"^Chronic fatigue syndrome\\.?$",
"^Cartas? ao editor Letters? to the Editor$",
"^Note from the Editor$",
"^Anesthesia Abstract$",
"^Annual report$",
"(?i)^“?THE RADICAL PREVENTION OF VENEREAL DISEASE\\.?”?$",
"(?i)^Graph and Table of Infectious Diseases?$",
"^Presentation$",
"(?i)^Reviews and Information on Publications$",
"(?i)^PUBLIC HEALTH SERVICES?$",
"(?i)^COMBINED TEXT-?BOOK OF OBSTETRICS AND GYN(Æ|ae)COLOGY$",
"(?i)^Adrese autora$",
"(?i)^Systematic Part .*\\. Catalogus Fossilium Austriae, Band 2: Echinoidea neogenica$",
"(?i)^Acknowledgement to Referees$",
"(?i)^Behçet's disease\\.?$",
"(?i)^Isolation and identification of restriction endonuclease.*$",
"(?i)^CEREBROVASCULAR DISEASES?.?$",
"(?i)^Screening for abdominal aortic aneurysms?\\.?$",
"^Event management$",
"(?i)^Breakfast and Crohn's disease.*\\.?$",
"^Cálculo de concentraciones en disoluciones acuosas. Ejercicio interactivo\\..*\\.$",
"(?i)^Genetic and functional analyses of SHANK2 mutations suggest a multiple hit model of Autism spectrum disorders?\\.?$",
"^Gushi hakubutsugaku$",
"^Starobosanski nadpisi u Bosni i Hercegovini \\(.*\\)$",
"^Intestinal spirocha?etosis$",
"^Treatment of Rodent Ulcer$",
"(?i)^\\W*Cloud Computing\\W*$",
"^Compendio mathematico : en que se contienen todas las materias mas principales de las Ciencias que tratan de la cantidad$",
"^Free Communications, Poster Presentations: Session [A-F]$",
"^“The Historical Aspects? of Quackery\\.?”$",
"^A designated centre for people with disabilities operated by St John of God Community Services (Limited|Ltd), Louth$",
"^P(er|re)-Mile Premiums for Auto Insurance\\.?$",
"(?i)^Case Report$",
"^Boletín Informativo$",
"(?i)^Glioblastoma Multiforme$",
"(?i)^Nuevos táxones animales descritos en la península Ibérica y Macaronesia desde 1994 \\(.*\\)$",
"^Zaměstnanecké výhody$",
"(?i)^The Economics of Terrorism and Counter-Terrorism: A Survey \\(Part .*\\)$",
"(?i)^Carotid body tumours?\\.?$",
"(?i)^\\[Españoles en Francia : La condición Emigrante.*\\]$",
"^Avant-propos$",
"(?i)^St\\. Patrick's Cathedral, Dublin, County Dublin - Head(s)? and Capital(s)?$",
"(?i)^St\\. Patrick's Cathedral, Dublin, County Dublin - Bases?$",
"(?i)^PUBLIC HEALTH VERSUS THE STATE$",
"^Viñetas de Cortázar$",
"(?i)^Search for heavy neutrinos and W(\\[|_|\\(|_\\{|-)?R(\\]|\\)|\\})? bosons with right-handed couplings in a left-right symmetric model in pp collisions at.*TeV(\\.)?$",
"(?i)^Measurement of the pseudorapidity and centrality dependence of the transverse energy density in Pb(-?)Pb collisions at.*tev(\\.?)$",
"(?i)^Search for resonances decaying into top-quark pairs using fully hadronic decays in pp collisions with ATLAS at.*TeV$",
"(?i)^Search for neutral minimal supersymmetric standard model Higgs bosons decaying to tau pairs in pp collisions at.*tev$",
"(?i)^Relatório de Estágio (de|em) Angiologia e Cirurgia Vascular$",
"^Aus der AGMB$",
"^Znanstveno-stručni prilozi$",
"(?i)^Zhodnocení finanční situace podniku a návrhy na zlepšení$",
"(?i)^Evaluation of the Financial Situation in the Firm and Proposals to its Improvement$",
"(?i)^Hodnocení finanční situace podniku a návrhy na její zlepšení$",
"^Finanční analýza podniku$",
"^Financial analysis( of business)?$",
"(?i)^Textbook of Gyn(a)?(Æ)?(e)?cology$",
"^Jikken nihon shūshinsho$",
"(?i)^CORONER('|s)(s|') INQUESTS$",
"(?i)^(Μελέτη παραγόντων )?risk management( για ανάπτυξη και εφαρμογή ενός πληροφοριακού συστήματος| και ανάπτυξη συστήματος)?$",
"(?i)^Consultants' contract(s)?$",
"(?i)^Upute autorima$",
"(?i)^Bijdrage tot de Kennis van den Godsdienst der Dajaks van Lan(d|f)ak en Tajan$",
"^Joshi shin kokubun$",
"^Kōtō shōgaku dokuhon nōson'yō$",
"^Jinjō shōgaku shōka$",
"^Shōgaku shūjichō$",
"^Nihon joshi dokuhon$",
"^Joshi shin dokuhon$",
"^Chūtō kanbun dokuhon$",
"^Wabun dokuhon$",
"(?i)^(Analysis of economy selected village or town|Rozbor hospodaření vybrané obce či města)$",
"(?i)^cardiac rehabilitation$",
"(?i)^Analytical summary$",
"^Thesaurus resolutionum Sacrae Congregationis Concilii$",
"(?i)^Sumario analítico(\\s{1})?(Analitic summary)?$",
"^Prikazi i osvrti$",
"^Rodinný dům s provozovnou$",
"^Family house with an establishment$",
"^Shinsei chūtō shin kokugun$",
"^Pulmonary alveolar proteinosis(\\.?)$",
"^Shinshū kanbun$",
"^Viñeta(s?) de Rodríguez$",
"(?i)^RUBRIKA UREDNIKA$",
"^A Matching Model of the Academic Publication Market$",
"^Yōgaku kōyō$",
"^Internetový marketing$",
"^Internet marketing$",
"^Chūtō kokugo dokuhon$",
"^Kokugo dokuhon$",
"^Antibiotic Cover for Dental Extraction(s?)$",
"^Strategie podniku$",
"^Strategy of an Enterprise$",
"(?i)^respiratory disease(s?)(\\.?)$",
"^Award(s?) for Gallantry in Civil Defence$",
"^Podniková kultura$",
"^Corporate Culture$",
"^Severe hyponatraemia in hospital inpatient(s?)(\\.?)$",
"^Pracovní motivace$",
"^Work Motivation$",
"^Kaitei kōtō jogaku dokuhon$",
"^Konsolidovaná účetní závěrka$",
"^Consolidated Financial Statements$",
"(?i)^intracranial tumour(s?)$",
"^Climate Change Mitigation Options and Directed Technical Change: A Decentralized Equilibrium Analysis$",
"^\\[CERVECERIAS MAHOU(\\.|\\:) INTERIOR\\] \\[Material gráfico\\]$",
"^Housing Market Dynamics(\\:|\\.) On the Contribution of Income Shocks and Credit Constraint(s?)$",
"^\\[Funciones auxiliares de la música en Radio París,.*\\]$",
"^Úroveň motivačního procesu jako způsobu vedení lidí$",
"^The level of motivation process as a leadership$",
"^Pay-beds in N(\\.?)H(\\.?)S(\\.?) Hospitals$",
"(?i)^news and events$",
"(?i)^NOVOSTI I DOGAĐAJI$",
"^Sansū no gakushū$",
"^Posouzení informačního systému firmy a návrh změn$",
"^Information System Assessment and Proposal for ICT Modification$",
"^Stresové zatížení pracovníků ve vybrané profesi$",
"^Stress load in a specific job$",
"^Sunday: Poster Sessions, Pt.*$",
"^Monday: Poster Sessions, Pt.*$",
"^Wednesday: Poster Sessions, Pt.*",
"^Tuesday: Poster Sessions, Pt.*$",
"^Analýza reklamy$",
"^Analysis of advertising$",
"^Shōgaku shūshinsho$",
"^Shōgaku sansū$",
"^Shintei joshi kokubun$",
"^Taishō joshi kokubun dokuhon$",
"^Joshi kokubun$",
"^Účetní uzávěrka a účetní závěrka v ČR$",
"(?i)^The \"?Causes\"? of Cancer$",
"^Normas para la publicación de artículos$",
"^Editor('|s)(s|') [Rr]eply$",
"^Editor(|s)(s|) letter$",
"^Redaktoriaus žodis$",
"^DISCUSSION ON THE PRECEDING PAPER$",
"^Kōtō shōgaku shūshinsho jidōyō$",
"^Shōgaku nihon rekishi$",
"^(Theory of the flow of action currents in isolated myelinated nerve fibers).*$",
"^Préface$",
"^Occupational [Hh]ealth [Ss]ervices.$",
"^In Memoriam Professor Toshiyuki TAKESHIMA$",
"^Účetní závěrka ve vybraném podniku.*$",
"^Financial statements in selected company$",
"^Abdominal [Aa]ortic [Aa]neurysms.*$",
"^Pseudomyxoma peritonei$",
"^Kazalo autora$",
"(?i)^uvodna riječ$",
"^Motivace jako způsob vedení lidí$",
"^Motivation as a leadership$",
"^Polyfunkční dům$",
"^Multi\\-funkcional building$",
"^Podnikatelský plán$",
"(?i)^Podnikatelský záměr$",
"(?i)^Business Plan$",
"^Oceňování nemovitostí$",
"^Marketingová komunikace$",
"^Marketing communication$",
"^Sumario Analítico$",
"^Riječ uredništva$",
"^Savjetovanja i priredbe$",
"^Índice$",
"^(Starobosanski nadpisi).*$",
"^Vzdělávání pracovníků v organizaci$",
"^Staff training in organization$",
"^(Life Histories of North American Geometridae).*$",
"^Strategická analýza podniku$",
"^Strategic Analysis of an Enterprise$",
"^Sadržaj$",
"^Upute suradnicima$",
"^Rodinný dům$",
"(?i)^Fami(l)?ly house$",
"^Upute autorima$",
"^Strategic Analysis$",
"^Finanční analýza vybraného podniku$",
"^Finanční analýza$",
"^Riječ urednika$",
"(?i)^Content(s?)$",
"(?i)^Inhalt$",
"^Jinjō shōgaku shūshinsho jidōyō$",
"(?i)^Index$",
"^Chūgaku kokubun kyōkasho$",
"^Retrato de una mujer$",
"^Retrato de un hombre$",
"^Kōtō shōgaku dokuhon$",
"^Shotōka kokugo$",
"^Shōgaku dokuhon$",
"^Jinjō shōgaku kokugo dokuhon$",
"^Shinsei kokugo dokuhon$",
"^Teikoku dokuhon$",
"^Instructions to Authors$",
"^KİTAP TAHLİLİ$",
"^PRZEGLĄD PIŚMIENNICTWA$",
"(?i)^Presentación$",
"^İçindekiler$",
"(?i)^Tabl?e of contents$",
"^(CODICE DEL BEATO DE LOS REYES FERNANDO I Y SANCHA).*$",
"^(\\[MADRID\\. BIBL\\. NAC\\. N.*KING FERDINAND I.*FROM SAN ISIDORO DE LEON\\. FACUNDUS SCRIPSIT DATED.*\\]).*",
"^Editorial( Board)?$",
"(?i)^Editorial \\(English\\)$",
"^Editörden$",
"^(Corpus Oral Dialectal \\(COD\\)\\.).*$",
"^(Kiri Karl Morgensternile).*$",
"^(\\[Eksliibris Aleksandr).*\\]$",
"^(\\[Eksliibris Aleksandr).*$",
"^(Eksliibris Aleksandr).*$",
"^(Kiri A\\. de Vignolles).*$",
"^(2 kirja Karl Morgensternile).*$",
"^(Pirita kloostri idaosa arheoloogilised).*$",
"^(Kiri tundmatule).*$",
"^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$",
"^(Eksliibris Nikolai Birukovile).*$",
"^(Eksliibris Nikolai Issakovile).*$",
"^(WHP Cruise Summary Information of section).*$",
"^(Measurement of the top quark\\-pair production cross section with ATLAS in pp collisions at).*$",
"^(Measurement of the spin\\-dependent structure function).*",
"(?i)^.*authors[']? reply\\.?$",
"(?i)^.*authors[']? response\\.?$"
]
}
}
}

View File

@ -3,6 +3,7 @@
"threshold" : "0.99",
"dedupRun" : "001",
"entityType" : "organization",
"subEntityValue": "organization",
"orderField" : "legalname",
"queueMaxSize" : "2000",
"groupMaxSize" : "50",
@ -87,8 +88,8 @@
}
}
],
"threshold": 0.7,
"aggregation": "W_MEAN",
"threshold": 0.1,
"aggregation": "AVG",
"positive": "layer4",
"negative": "NO_MATCH",
"undefined": "NO_MATCH",
@ -106,7 +107,7 @@
}
}
],
"threshold": 0.9,
"threshold": 0.7,
"aggregation": "AVG",
"positive": "layer5",
"negative": "NO_MATCH",
@ -129,7 +130,9 @@
"comparator": "jaroWinklerNormalizedName",
"weight": 0.1,
"countIfUndefined": "false",
"params": {}
"params": {
"windowSize": 4
}
}
],
"threshold": 0.9,
@ -145,14 +148,14 @@
{ "name" : "legalshortname", "type" : "String", "path" : "$.legalshortname.value"},
{ "name" : "legalname", "type" : "String", "path" : "$.legalname.value" },
{ "name" : "websiteurl", "type" : "URL", "path" : "$.websiteurl.value" },
{ "name" : "gridid", "type" : "String", "path" : "$.pid[?(@.qualifier.classid =='grid.ac')].value"},
{ "name" : "gridid", "type" : "String", "path" : "$.pid[?(@.qualifier.classid =='grid')].value"},
{ "name" : "originalId", "type" : "String", "path" : "$.id" }
],
"blacklists" : {
"legalname" : []
},
"synonyms": {
"key::1": ["university","università","università studi","universitario","universitaria","université", "universite", "universitaire","universitaires","universidad","universitade","Universität","universitaet","Uniwersytet","университет","universiteit","πανεπιστήμιο","universitesi","universiteti", "universiti"],
"key::1": ["university","università", "universitas", "università studi","universitario","universitaria","université", "universite", "universitaire","universitaires","universidad","universitade","Universität","universitaet","Uniwersytet","университет","universiteit","πανεπιστήμιο","universitesi","universiteti", "universiti"],
"key::2": ["studies","studi","études","estudios","estudos","Studien","studia","исследования","studies","σπουδές"],
"key::3": ["advanced","superiore","supérieur","supérieure","supérieurs","supérieures","avancado","avancados","fortgeschrittene","fortgeschritten","zaawansowany","передовой","gevorderd","gevorderde","προχωρημένος","προχωρημένη","προχωρημένο","προχωρημένες","προχωρημένα","wyzsza"],
"key::4": ["institute","istituto","institut","instituto","instituto","Institut","instytut","институт","instituut","ινστιτούτο"],

View File

@ -28,34 +28,10 @@
"idPath": "$.id"
},
"pace": {
"clustering": [
{
"name": "ngrampairs",
"fields": [
"title"
],
"params": {
"max": "1",
"ngramLen": "3"
}
},
{
"name": "suffixprefix",
"fields": [
"title"
],
"params": {
"max": "1",
"len": "3"
}
},
{
"name": "lowercase",
"fields": [
"doi"
],
"params": {}
}
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {
"start": {

View File

@ -1,8 +0,0 @@
CREATE view result as
select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p
union all
select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d
union all
select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s
union all
select id, dateofcollection, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o;

View File

@ -19,6 +19,10 @@
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hive_jdbc_url</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hive_db_name</name>
<value>openaire</value>

View File

@ -0,0 +1,10 @@
DROP VIEW IF EXISTS ${hive_db_name}.result;
CREATE VIEW IF NOT EXISTS result as
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o;

View File

@ -1,4 +1,5 @@
<workflow-app name="import_infospace_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
@ -22,19 +23,32 @@
</property>
</parameters>
<start to="MapGraphIntoDataFrame"/>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="MapGraphAsHiveDB"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="MapGraphIntoDataFrame">
<action name="MapGraphAsHiveDB">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<master>yarn</master>
<mode>cluster</mode>
<name>MapGraphIntoDataFrame</name>
<name>MapGraphAsHiveDB</name>
<class>eu.dnetlib.dhp.graph.SparkGraphImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
@ -55,12 +69,19 @@
</action>
<action name="PostProcessing">
<hive xmlns="uri:oozie:hive-action:0.2">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>/eu/dnetlib/dhp/graph/hive/postprocessing.sql</script>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hive_metastore_uris}</value>
</property>
</configuration>
<jdbc-url>${hive_jdbc_url}/${hive_db_name}</jdbc-url>
<script>lib/scripts/postprocessing.sql</script>
<param>hive_db_name=${hive_db_name}</param>
</hive>
</hive2>
<ok to="End"/>
<error to="Kill"/>
</action>

View File

@ -1,12 +1,14 @@
sparkDriverMemory=10G
sparkExecutorMemory=15G
sparkExecutorCoresForJoining=1
sparkDriverMemoryForJoining=10G
sparkExecutorMemoryForJoining=15G
sparkExecutorCoresForIndexing=64
sparkDriverMemoryForIndexing=3G
sparkExecutorMemoryForIndexing=2G
#isLookupUrl=http://services.openaire.eu:8280/is/services/isLookUp
isLookupUrl=http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl
sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03
outputPath=/tmp/openaire_provision
format=TMF
batchSize=2000
sparkExecutorCoresForJoining=128
sparkExecutorCoresForIndexing=64
reuseRecords=false
otherDsTypeId=scholarcomminfra, infospace, pubsrepository::mock, entityregistry, entityregistry::projects, entityregistry::repositories, websource

View File

@ -1,4 +1,5 @@
<workflow-app name="index_infospace_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hive_db_name</name>
@ -26,6 +27,21 @@
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="reuse_records"/>
<decision name="reuse_records">
@ -42,8 +58,6 @@
<action name="adjancency_lists">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>build_adjacency_lists</name>
@ -71,8 +85,6 @@
<action name="to_solr_index">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>to_solr_index</name>

View File

@ -148,6 +148,13 @@
<version>${dhp.commons.lang.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${dhp.guava.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
@ -496,9 +503,9 @@
<dhp.spark.version>2.4.0.cloudera2</dhp.spark.version>
<dhp.jackson.version>2.9.6</dhp.jackson.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<dhp.guava.version>11.0.2</dhp.guava.version>
<scala.version>2.11.12</scala.version>
<junit.version>4.12</junit.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version>
</properties>
</project>