dedupRecord = DedupRecordFactory
- .createDedupRecord(
- sc,
- spark,
- DedupUtility.createMergeRelPath(dedupPath, entity),
- DedupUtility.createEntityPath(sourcePath, entity),
- OafEntityType.valueOf(entity),
- dedupConf);
- spark
- .createDataset(dedupRecord.rdd(), Encoders.kryo(OafEntity.class))
- .write()
- .mode(SaveMode.Overwrite)
- .save(dedupPath + "/" + entity + "/dedup_records");
-// dedupRecord
-// .map(
-// r -> {
-// ObjectMapper mapper = new ObjectMapper();
-// return mapper.writeValueAsString(r);
-// })
-// .saveAsTextFile(dedupPath + "/" + entity + "/dedup_records");
- }
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/
deleted file mode 100644
index 7adf992cd2..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/
+++ /dev/null
@@ -1,92 +0,0 @@
-package eu.dnetlib.dedup;
-import java.util.List;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SparkSession;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.pace.config.DedupConfig;
-import eu.dnetlib.pace.model.MapDocument;
-import eu.dnetlib.pace.util.MapDocumentUtil;
-import scala.Tuple2;
- * This Spark class creates similarity relations between entities, saving result
- *
- * param request: sourcePath entityType target Path
- */
-public class SparkCreateSimRels {
- public static void main(String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkCreateSimRels.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/sx/dedup/dedup_parameters.json")));
- parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .appName(SparkCreateSimRels.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- final String inputPath = parser.get("sourcePath");
- final String entity = parser.get("entity");
- final String targetPath = parser.get("targetPath");
- // final DedupConfig dedupConf =
- // DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
- final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
- JavaPairRDD mapDocument = spark
- .read()
- .load(inputPath + "/" + entity)
- .as(Encoders.kryo(Oaf.class))
- .map((MapFunction) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING())
- .javaRDD()
- .repartition(1000)
- .mapToPair(
- s -> {
- MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
- return new Tuple2<>(d.getIdentifier(), d);
- });
- // create blocks for deduplication
- JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
- // JavaPairRDD> blocks = Deduper.createBlocks(sc,
- // mapDocument, dedupConf);
- // create relations by comparing only elements in the same group
- final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
- // final JavaPairRDD dedupRels = Deduper.computeRelations(sc, blocks,
- // dedupConf);
- final JavaRDD isSimilarToRDD = dedupRels
- .map(
- simRel -> {
- final Relation r = new Relation();
- r.setSource(simRel._1());
- r.setTarget(simRel._2());
- r.setRelClass("isSimilarTo");
- return r;
- });
- spark
- .createDataset(isSimilarToRDD.rdd(), Encoders.bean(Relation.class))
- .write()
- .mode("overwrite")
- .save(DedupUtility.createSimRelPath(targetPath, entity));
- }
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/
deleted file mode 100644
index 21e72b5b8d..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/
+++ /dev/null
@@ -1,52 +0,0 @@
-package eu.dnetlib.dedup;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.spark.util.LongAccumulator;
-import eu.dnetlib.pace.util.Reporter;
-import scala.Serializable;
-import scala.Tuple2;
-public class SparkReporter implements Serializable, Reporter {
- final List> relations = new ArrayList<>();
- private static final Log log = LogFactory.getLog(SparkReporter.class);
- Map accumulators;
- public SparkReporter(Map accumulators) {
- this.accumulators = accumulators;
- }
- public void incrementCounter(
- String counterGroup,
- String counterName,
- long delta,
- Map accumulators) {
- final String accumulatorName = String.format("%s::%s", counterGroup, counterName);
- if (accumulators.containsKey(accumulatorName)) {
- accumulators.get(accumulatorName).add(delta);
- }
- }
- @Override
- public void incrementCounter(String counterGroup, String counterName, long delta) {
- incrementCounter(counterGroup, counterName, delta, accumulators);
- }
- @Override
- public void emit(String type, String from, String to) {
- relations.add(new Tuple2<>(from, to));
- }
- public List> getRelations() {
- return relations;
- }
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/
deleted file mode 100644
index 79a3114fda..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/
+++ /dev/null
@@ -1,84 +0,0 @@
-package eu.dnetlib.dedup.graph;
-import java.util.Set;
-import org.apache.commons.lang.StringUtils;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dedup.DedupUtility;
-import eu.dnetlib.pace.util.PaceException;
-public class ConnectedComponent implements Serializable {
- private Set docIds;
- private String ccId;
- public ConnectedComponent() {
- }
- public ConnectedComponent(Set docIds) {
- this.docIds = docIds;
- createID();
- }
- public String createID() {
- if (docIds.size() > 1) {
- final String s = getMin();
- String prefix = s.split("\\|")[0];
- ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
- return ccId;
- } else {
- return docIds.iterator().next();
- }
- }
- @JsonIgnore
- public String getMin() {
- final StringBuilder min = new StringBuilder();
- docIds
- .forEach(
- i -> {
- if (StringUtils.isBlank(min.toString())) {
- min.append(i);
- } else {
- if (min.toString().compareTo(i) > 0) {
- min.setLength(0);
- min.append(i);
- }
- }
- });
- return min.toString();
- }
- @Override
- public String toString() {
- ObjectMapper mapper = new ObjectMapper();
- try {
- return mapper.writeValueAsString(this);
- } catch (IOException e) {
- throw new PaceException("Failed to create Json: ", e);
- }
- }
- public Set getDocIds() {
- return docIds;
- }
- public void setDocIds(Set docIds) {
- this.docIds = docIds;
- }
- public String getCcId() {
- return ccId;
- }
- public void setCcId(String ccId) {
- this.ccId = ccId;
- }
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/GraphProcessor.scala
deleted file mode 100644
index 38c6951528..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/GraphProcessor.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-package eu.dnetlib.dedup.graph
-import org.apache.spark.graphx._
-import org.apache.spark.rdd.RDD
-import scala.collection.JavaConversions;
-object GraphProcessor {
- def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = {
- val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
- val cc = graph.connectedComponents(maxIterations).vertices
- val joinResult = vertexes.leftOuterJoin(cc).map {
- case (id, (openaireId, cc)) => {
- if (cc.isEmpty) {
- (id, openaireId)
- }
- else {
- (cc.get, openaireId)
- }
- }
- }
- val connectedComponents = joinResult.groupByKey()
- .map[ConnectedComponent](cc => asConnectedComponent(cc))
- connectedComponents
- }
- def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = {
- val docs = group._2.toSet[String]
- val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs));
- connectedComponent
- }
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/
deleted file mode 100644
index 3134f94000..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/
+++ /dev/null
@@ -1,78 +0,0 @@
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.scholexplorer.OafUtils;
-import scala.Tuple2;
-public class SparkPropagateRelationsJob {
- public static void main(String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkPropagateRelationsJob.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/sx/dedup/dedup_propagate_relation_parameters.json")));
- parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .appName(SparkUpdateEntityJob.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- final String relationPath = parser.get("relationPath");
- final String mergeRelPath = parser.get("mergeRelPath");
- final String targetRelPath = parser.get("targetRelPath");
- final Dataset merge = spark
- .read()
- .load(mergeRelPath)
- .as(Encoders.bean(Relation.class))
- .where("relClass == 'merges'");
- final Dataset rels = spark
- .read()
- .load(relationPath)
- .as(Encoders.kryo(Relation.class))
- .map(
- (MapFunction) r -> r,
- Encoders.bean(Relation.class));
- final Dataset firstJoin = rels
- .joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer")
- .map(
- (MapFunction, Relation>) r -> {
- final Relation mergeRelation = r._2();
- final Relation relation = r._1();
- if (mergeRelation != null)
- relation.setSource(mergeRelation.getSource());
- if (relation.getDataInfo() == null)
- relation.setDataInfo(OafUtils.generateDataInfo("0.9", false));
- return relation;
- },
- Encoders.bean(Relation.class));
- final Dataset secondJoin = firstJoin
- .joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer")
- .map(
- (MapFunction, Relation>) r -> {
- final Relation mergeRelation = r._2();
- final Relation relation = r._1();
- if (mergeRelation != null)
- relation.setTarget(mergeRelation.getSource());
- return relation;
- },
- Encoders.kryo(Relation.class));
- secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath);
- }
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/
deleted file mode 100644
index a847ad6125..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/
+++ /dev/null
@@ -1,102 +0,0 @@
-import org.apache.spark.sql.*;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
-import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
-import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
-import eu.dnetlib.dhp.utils.DHPUtils;
-import scala.Tuple2;
-public class SparkUpdateEntityJob {
- static final String IDJSONPATH = "$.id";
- public static void main(String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkUpdateEntityJob.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json")));
- parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .appName(SparkUpdateEntityJob.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- final String entityPath = parser.get("entityPath");
- final String mergeRelPath = parser.get("mergeRelPath");
- final String dedupRecordPath = parser.get("dedupRecordPath");
- final String entity = parser.get("entity");
- final String destination = parser.get("targetPath");
- final Dataset df =;
- final JavaPairRDD mergedIds = df
- .where("relClass == 'merges'")
- .select(df.col("target"))
- .distinct()
- .toJavaRDD()
- .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d"));
- final JavaRDD sourceEntity = sc.textFile(entityPath);
- final JavaRDD dedupEntity = sc.textFile(dedupRecordPath);
- JavaPairRDD entitiesWithId = sourceEntity
- .mapToPair(
- (PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, s), s));
- Class extends Oaf> mainClass;
- switch (entity) {
- case "publication":
- mainClass = DLIPublication.class;
- break;
- case "dataset":
- mainClass = DLIDataset.class;
- break;
- case "unknown":
- mainClass = DLIUnknown.class;
- break;
- default:
- throw new IllegalArgumentException("Illegal type " + entity);
- }
- JavaRDD map = entitiesWithId
- .leftOuterJoin(mergedIds)
- .map(
- k -> k._2()._2().isPresent()
- ? updateDeletedByInference(k._2()._1(), mainClass)
- : k._2()._1());
- map.union(dedupEntity).saveAsTextFile(destination, GzipCodec.class);
- }
- private static String updateDeletedByInference(
- final String json, final Class clazz) {
- final ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- try {
- Oaf entity = mapper.readValue(json, clazz);
- if (entity.getDataInfo() == null)
- entity.setDataInfo(new DataInfo());
- entity.getDataInfo().setDeletedbyinference(true);
- return mapper.writeValueAsString(entity);
- } catch (IOException e) {
- throw new RuntimeException("Unable to convert json", e);
- }
- }
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala
deleted file mode 100644
index ce883e2072..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation}
-import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown, OafUtils}
-import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
-import org.slf4j.LoggerFactory
-import org.apache.spark.sql.functions.col
-object SparkUpdateEntityWithDedupInfo {
- def main(args: Array[String]): Unit = {
- val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json")))
- val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass)
- parser.parseArgument(args)
- val workingPath: String = parser.get("workingPath")
-"Working dir path = $workingPath")
- implicit val oafEncoder: Encoder[OafEntity] = Encoders.kryo[OafEntity]
- implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
- implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
- implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
- implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
- val spark: SparkSession = SparkSession
- .builder()
- .appName(SparkUpdateEntityWithDedupInfo.getClass.getSimpleName)
- .master(parser.get("master"))
- .getOrCreate()
- val entityPath = parser.get("entityPath")
- val mergeRelPath = parser.get("mergeRelPath")
- val dedupRecordPath = parser.get("dedupRecordPath")
- val entity = parser.get("entity")
- val destination = parser.get("targetPath")
- val mergedIds =[Relation]
- .where("relClass == 'merges'")
- .select(col("target"))
- val entities: Dataset[(String, OafEntity)] = spark
- .read
- .load(entityPath).as[OafEntity]
- .map(o => (o.getId, o))(Encoders.tuple(Encoders.STRING, oafEncoder))
- val finalDataset:Dataset[OafEntity] = entities.joinWith(mergedIds, entities("_1").equalTo(mergedIds("target")), "left")
- .map(k => {
- val e: OafEntity = k._1._2
- val t = k._2
- if (t != null && t.getString(0).nonEmpty) {
- if (e.getDataInfo == null) {
- e.setDataInfo(OafUtils.generateDataInfo())
- }
- e.getDataInfo.setDeletedbyinference(true)
- }
- e
- })
- val dedupRecords :Dataset[OafEntity] =[OafEntity]
- finalDataset.union(dedupRecords)
- .repartition(1200).write
- .mode(SaveMode.Overwrite).save(destination)
- }
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedupRecord_parameters.json b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedupRecord_parameters.json
deleted file mode 100644
index de744dfb63..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedupRecord_parameters.json
+++ /dev/null
@@ -1,33 +0,0 @@
- {
- "paramName": "mt",
- "paramLongName": "master",
- "paramDescription": "should be local or yarn",
- "paramRequired": true
- },
- {
- "paramName": "s",
- "paramLongName": "sourcePath",
- "paramDescription": "the path of the sequential file to read",
- "paramRequired": true
- },
- {
- "paramName": "e",
- "paramLongName": "entity",
- "paramDescription": "the type of entity to be deduped",
- "paramRequired": true
- },
- {
- "paramName": "c",
- "paramLongName": "dedupConf",
- "paramDescription": "dedup configuration to be used",
- "compressed": true,
- "paramRequired": true
- },
- {
- "paramName": "d",
- "paramLongName": "dedupPath",
- "paramDescription": "dedup path to load mergeRelation",
- "paramRequired": true
- }
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json
deleted file mode 100644
index 69428a2963..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json
+++ /dev/null
@@ -1,38 +0,0 @@
- {
- "paramName": "mt",
- "paramLongName": "master",
- "paramDescription": "should be local or yarn",
- "paramRequired": true
- },
- {
- "paramName": "ep",
- "paramLongName": "entityPath",
- "paramDescription": "the input entity path",
- "paramRequired": true
- },
- {
- "paramName": "mr",
- "paramLongName": "mergeRelPath",
- "paramDescription": "the input path of merge Rel",
- "paramRequired": true
- },
- {
- "paramName": "dr",
- "paramLongName": "dedupRecordPath",
- "paramDescription": "the inputPath of dedup record",
- "paramRequired": true
- },
- {
- "paramName": "e",
- "paramLongName": "entity",
- "paramDescription": "the type of entity",
- "paramRequired": true
- },
- {
- "paramName": "t",
- "paramLongName": "targetPath",
- "paramDescription": "the targetPath",
- "paramRequired": true
- }
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_parameters.json b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_parameters.json
deleted file mode 100644
index 8ba8515d0e..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_parameters.json
+++ /dev/null
@@ -1,33 +0,0 @@
- {
- "paramName": "mt",
- "paramLongName": "master",
- "paramDescription": "should be local or yarn",
- "paramRequired": true
- },
- {
- "paramName": "s",
- "paramLongName": "sourcePath",
- "paramDescription": "the path of the sequential file to read",
- "paramRequired": true
- },
- {
- "paramName": "e",
- "paramLongName": "entity",
- "paramDescription": "the type of entity to be deduped",
- "paramRequired": true
- },
- {
- "paramName": "c",
- "paramLongName": "dedupConf",
- "paramDescription": "dedup configuration to be used",
- "compressed": true,
- "paramRequired": true
- },
- {
- "paramName": "t",
- "paramLongName": "targetPath",
- "paramDescription": "target path to save dedup result",
- "paramRequired": true
- }
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_propagate_relation_parameters.json b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_propagate_relation_parameters.json
deleted file mode 100644
index 2ce78440fb..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/dedup_propagate_relation_parameters.json
+++ /dev/null
@@ -1,26 +0,0 @@
- {
- "paramName": "mt",
- "paramLongName": "master",
- "paramDescription": "should be local or yarn",
- "paramRequired": true
- },
- {
- "paramName": "ep",
- "paramLongName": "relationPath",
- "paramDescription": "the input relation path",
- "paramRequired": true
- },
- {
- "paramName": "mr",
- "paramLongName": "mergeRelPath",
- "paramDescription": "the input path of merge Rel",
- "paramRequired": true
- },
- {
- "paramName": "t",
- "paramLongName": "targetRelPath",
- "paramDescription": "the output Rel Path",
- "paramRequired": true
- }
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/config-default.xml
deleted file mode 100644
index 2e0ed9aeea..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/config-default.xml
+++ /dev/null
@@ -1,18 +0,0 @@
- jobTracker
- yarnRM
- nameNode
- hdfs://nameservice1
- oozie.use.system.libpath
- true
- oozie.action.sharelib.for.spark
- spark2
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml
deleted file mode 100644
index 2214fd20ac..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml
+++ /dev/null
@@ -1,182 +0,0 @@
- sourcePath
- the source path
- entity
- the entity that should be processed
- dedupConf
- the dedup Configuration
- targetPath
- the target path
- sparkDriverMemory
- memory for driver process
- sparkExecutorMemory
- memory for individual executor
- Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Create Similarity Relations
- eu.dnetlib.dedup.SparkCreateSimRels
- dhp-dedup-scholexplorer-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --executor-cores=${sparkExecutorCores}
- ${sparkExtraOPT}
- -mtyarn-cluster
- --sourcePath${sourcePath}
- --targetPath${targetPath}
- --entity${entity}
- --dedupConf${dedupConf}
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Create Connected Components
- eu.dnetlib.dedup.SparkCreateConnectedComponent
- dhp-dedup-scholexplorer-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --executor-cores=${sparkExecutorCores}
- ${sparkExtraOPT}
- -mtyarn-cluster
- --sourcePath${sourcePath}
- --targetPath${targetPath}
- --entity${entity}
- --dedupConf${dedupConf}
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Create Dedup Record
- eu.dnetlib.dedup.SparkCreateDedupRecord
- dhp-dedup-scholexplorer-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --executor-cores=${sparkExecutorCores}
- ${sparkExtraOPT}
- -mtyarn-cluster
- --sourcePath${sourcePath}
- --dedupPath${targetPath}
- --entity${entity}
- --dedupConf${dedupConf}
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Propagate Dedup Relations
- dhp-dedup-scholexplorer-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --executor-cores=${sparkExecutorCores}
- ${sparkExtraOPT}
- -mtyarn-cluster
- --mergeRelPath${targetPath}/${entity}/mergeRel
- --relationPath${sourcePath}/relation
- --targetRelPath${targetPath}/${entity}/updated_relation
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Update ${entity} and add DedupRecord
- dhp-dedup-scholexplorer-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --executor-cores=${sparkExecutorCores}
- ${sparkExtraOPT}
- -mtyarn-cluster
- --entityPath${sourcePath}/${entity}
- --mergeRelPath${targetPath}/${entity}/mergeRel
- --entity${entity}
- --dedupRecordPath${targetPath}/${entity}/dedup_records
- --targetPath${targetPath}/${entity}/updated_record
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/test/resources/eu/dnetlib/dedup/sx/conf/pub_scholix.conf.json b/dhp-workflows/dhp-dedup-scholexplorer/src/test/resources/eu/dnetlib/dedup/sx/conf/pub_scholix.conf.json
deleted file mode 100644
index d914198534..0000000000
--- a/dhp-workflows/dhp-dedup-scholexplorer/src/test/resources/eu/dnetlib/dedup/sx/conf/pub_scholix.conf.json
+++ /dev/null
@@ -1,378 +0,0 @@
- "wf": {
- "threshold": "0.99",
- "dedupRun": "001",
- "entityType": "result",
- "subEntityType": "resulttype",
- "subEntityValue": "publication",
- "orderField": "title",
- "queueMaxSize": "2000",
- "groupMaxSize": "100",
- "maxChildren": "100",
- "slidingWindowSize": "200",
- "rootBuilder": [
- ],
- "includeChildren": "true",
- "maxIterations": 20,
- "idPath": "$.id"
- },
- "pace": {
- "clustering": [
- {
- "name": "ngrampairs",
- "fields": [
- "title"
- ],
- "params": {
- "max": "1",
- "ngramLen": "3"
- }
- },
- {
- "name": "suffixprefix",
- "fields": [
- "title"
- ],
- "params": {
- "max": "1",
- "len": "3"
- }
- }
- ],
- "decisionTree": {
- "start": {
- "fields": [
- {
- "field": "pid",
- "comparator": "jsonListMatch",
- "weight": 1.0,
- "countIfUndefined": "false",
- "params": {
- "jpath_value": "$.value",
- "jpath_classid": "$.qualifier.classid"
- }
- }
- ],
- "threshold": 0.5,
- "aggregation": "AVG",
- "positive": "MATCH",
- "negative": "layer2",
- "undefined": "layer2",
- "ignoreUndefined": "true"
- },
- "layer2": {
- "fields": [
- {
- "field": "title",
- "comparator": "titleVersionMatch",
- "weight": 1.0,
- "countIfUndefined": "false",
- "params": {}
- },
- {
- "field": "authors",
- "comparator": "sizeMatch",
- "weight": 1.0,
- "countIfUndefined": "false",
- "params": {}
- }
- ],
- "threshold": 1.0,
- "aggregation": "AND",
- "positive": "layer3",
- "negative": "NO_MATCH",
- "undefined": "layer3",
- "ignoreUndefined": "false"
- },
- "layer3": {
- "fields": [
- {
- "field": "title",
- "comparator": "levensteinTitle",
- "weight": 1.0,
- "countIfUndefined": "true",
- "params": {}
- }
- ],
- "threshold": 0.99,
- "aggregation": "AVG",
- "positive": "MATCH",
- "negative": "NO_MATCH",
- "undefined": "NO_MATCH",
- "ignoreUndefined": "true"
- }
- },
- "model": [
- {
- "name": "pid",
- "type": "JSON",
- "path": "$.pid",
- "overrideMatch": "true"
- },
- {
- "name": "title",
- "type": "String",
- "path": "$.title[*].value",
- "length": 250,
- "size": 5
- },
- {
- "name": "authors",
- "type": "List",
- "path": "$.author[*].fullname",
- "size": 200
- },
- {
- "name": "resulttype",
- "type": "String",
- "path": "$.resulttype.classid"
- }
- ],
- "blacklists": {
- "title": [
- "^Inside Front Cover$",
- "^CORR Insights$",
- "^Index des notions$",
- "^Department of Error.$",
- "^Untitled Item$",
- "^Department of Error$",
- "^Tome II : 1598 à 1605$",
- "^(à l’exception de roi, prince, royauté, pouvoir, image… qui sont omniprésents)$",
- "^Museen und Ausstellungsinstitute in Nürnberg$",
- "^Text/Conference Paper$",
- "^Table des illustrations$",
- "^An Intimate Insight on Psychopathy and a Novel Hermeneutic Psychological Science$",
- "^Index des noms$",
- "^Reply by Authors.$",
- "^Titelblatt - Inhalt$",
- "^Index des œuvres,$",
- "(?i)^Poster presentations$",
- "^Problems with perinatal pathology\\.?$",
- "(?i)^Cases? of Puerperal Convulsions$",
- "(?i)^Operative Gyna?ecology$",
- "(?i)^Mind the gap\\!?\\:?$",
- "^Chronic fatigue syndrome\\.?$",
- "^Cartas? ao editor Letters? to the Editor$",
- "^Note from the Editor$",
- "^Anesthesia Abstract$",
- "^Annual report$",
- "(?i)^Graph and Table of Infectious Diseases?$",
- "^Presentation$",
- "(?i)^Reviews and Information on Publications$",
- "(?i)^Adrese autora$",
- "(?i)^Systematic Part .*\\. Catalogus Fossilium Austriae, Band 2: Echinoidea neogenica$",
- "(?i)^Acknowledgement to Referees$",
- "(?i)^Behçet's disease\\.?$",
- "(?i)^Isolation and identification of restriction endonuclease.*$",
- "(?i)^Screening for abdominal aortic aneurysms?\\.?$",
- "^Event management$",
- "(?i)^Breakfast and Crohn's disease.*\\.?$",
- "^Cálculo de concentraciones en disoluciones acuosas. Ejercicio interactivo\\..*\\.$",
- "(?i)^Genetic and functional analyses of SHANK2 mutations suggest a multiple hit model of Autism spectrum disorders?\\.?$",
- "^Gushi hakubutsugaku$",
- "^Starobosanski nadpisi u Bosni i Hercegovini \\(.*\\)$",
- "^Intestinal spirocha?etosis$",
- "^Treatment of Rodent Ulcer$",
- "(?i)^\\W*Cloud Computing\\W*$",
- "^Compendio mathematico : en que se contienen todas las materias mas principales de las Ciencias que tratan de la cantidad$",
- "^Free Communications, Poster Presentations: Session [A-F]$",
- "^“The Historical Aspects? of Quackery\\.?”$",
- "^A designated centre for people with disabilities operated by St John of God Community Services (Limited|Ltd), Louth$",
- "^P(er|re)-Mile Premiums for Auto Insurance\\.?$",
- "(?i)^Case Report$",
- "^Boletín Informativo$",
- "(?i)^Glioblastoma Multiforme$",
- "(?i)^Nuevos táxones animales descritos en la península Ibérica y Macaronesia desde 1994 \\(.*\\)$",
- "^Zaměstnanecké výhody$",
- "(?i)^The Economics of Terrorism and Counter-Terrorism: A Survey \\(Part .*\\)$",
- "(?i)^Carotid body tumours?\\.?$",
- "(?i)^\\[Españoles en Francia : La condición Emigrante.*\\]$",
- "^Avant-propos$",
- "(?i)^St\\. Patrick's Cathedral, Dublin, County Dublin - Head(s)? and Capital(s)?$",
- "(?i)^St\\. Patrick's Cathedral, Dublin, County Dublin - Bases?$",
- "^Viñetas de Cortázar$",
- "(?i)^Search for heavy neutrinos and W(\\[|_|\\(|_\\{|-)?R(\\]|\\)|\\})? bosons with right-handed couplings in a left-right symmetric model in pp collisions at.*TeV(\\.)?$",
- "(?i)^Measurement of the pseudorapidity and centrality dependence of the transverse energy density in Pb(-?)Pb collisions at.*tev(\\.?)$",
- "(?i)^Search for resonances decaying into top-quark pairs using fully hadronic decays in pp collisions with ATLAS at.*TeV$",
- "(?i)^Search for neutral minimal supersymmetric standard model Higgs bosons decaying to tau pairs in pp collisions at.*tev$",
- "(?i)^Relatório de Estágio (de|em) Angiologia e Cirurgia Vascular$",
- "^Aus der AGMB$",
- "^Znanstveno-stručni prilozi$",
- "(?i)^Zhodnocení finanční situace podniku a návrhy na zlepšení$",
- "(?i)^Evaluation of the Financial Situation in the Firm and Proposals to its Improvement$",
- "(?i)^Hodnocení finanční situace podniku a návrhy na její zlepšení$",
- "^Finanční analýza podniku$",
- "^Financial analysis( of business)?$",
- "(?i)^Textbook of Gyn(a)?(Æ)?(e)?cology$",
- "^Jikken nihon shūshinsho$",
- "(?i)^CORONER('|s)(s|') INQUESTS$",
- "(?i)^(Μελέτη παραγόντων )?risk management( για ανάπτυξη και εφαρμογή ενός πληροφοριακού συστήματος| και ανάπτυξη συστήματος)?$",
- "(?i)^Consultants' contract(s)?$",
- "(?i)^Upute autorima$",
- "(?i)^Bijdrage tot de Kennis van den Godsdienst der Dajaks van Lan(d|f)ak en Tajan$",
- "^Joshi shin kokubun$",
- "^Kōtō shōgaku dokuhon nōson'yō$",
- "^Jinjō shōgaku shōka$",
- "^Shōgaku shūjichō$",
- "^Nihon joshi dokuhon$",
- "^Joshi shin dokuhon$",
- "^Chūtō kanbun dokuhon$",
- "^Wabun dokuhon$",
- "(?i)^(Analysis of economy selected village or town|Rozbor hospodaření vybrané obce či města)$",
- "(?i)^cardiac rehabilitation$",
- "(?i)^Analytical summary$",
- "^Thesaurus resolutionum Sacrae Congregationis Concilii$",
- "(?i)^Sumario analítico(\\s{1})?(Analitic summary)?$",
- "^Prikazi i osvrti$",
- "^Rodinný dům s provozovnou$",
- "^Family house with an establishment$",
- "^Shinsei chūtō shin kokugun$",
- "^Pulmonary alveolar proteinosis(\\.?)$",
- "^Shinshū kanbun$",
- "^Viñeta(s?) de Rodríguez$",
- "^A Matching Model of the Academic Publication Market$",
- "^Yōgaku kōyō$",
- "^Internetový marketing$",
- "^Internet marketing$",
- "^Chūtō kokugo dokuhon$",
- "^Kokugo dokuhon$",
- "^Antibiotic Cover for Dental Extraction(s?)$",
- "^Strategie podniku$",
- "^Strategy of an Enterprise$",
- "(?i)^respiratory disease(s?)(\\.?)$",
- "^Award(s?) for Gallantry in Civil Defence$",
- "^Podniková kultura$",
- "^Corporate Culture$",
- "^Severe hyponatraemia in hospital inpatient(s?)(\\.?)$",
- "^Pracovní motivace$",
- "^Work Motivation$",
- "^Kaitei kōtō jogaku dokuhon$",
- "^Konsolidovaná účetní závěrka$",
- "^Consolidated Financial Statements$",
- "(?i)^intracranial tumour(s?)$",
- "^Climate Change Mitigation Options and Directed Technical Change: A Decentralized Equilibrium Analysis$",
- "^\\[CERVECERIAS MAHOU(\\.|\\:) INTERIOR\\] \\[Material gráfico\\]$",
- "^Housing Market Dynamics(\\:|\\.) On the Contribution of Income Shocks and Credit Constraint(s?)$",
- "^\\[Funciones auxiliares de la música en Radio París,.*\\]$",
- "^Úroveň motivačního procesu jako způsobu vedení lidí$",
- "^The level of motivation process as a leadership$",
- "^Pay-beds in N(\\.?)H(\\.?)S(\\.?) Hospitals$",
- "(?i)^news and events$",
- "^Sansū no gakushū$",
- "^Posouzení informačního systému firmy a návrh změn$",
- "^Information System Assessment and Proposal for ICT Modification$",
- "^Stresové zatížení pracovníků ve vybrané profesi$",
- "^Stress load in a specific job$",
- "^Sunday: Poster Sessions, Pt.*$",
- "^Monday: Poster Sessions, Pt.*$",
- "^Wednesday: Poster Sessions, Pt.*",
- "^Tuesday: Poster Sessions, Pt.*$",
- "^Analýza reklamy$",
- "^Analysis of advertising$",
- "^Shōgaku shūshinsho$",
- "^Shōgaku sansū$",
- "^Shintei joshi kokubun$",
- "^Taishō joshi kokubun dokuhon$",
- "^Joshi kokubun$",
- "^Účetní uzávěrka a účetní závěrka v ČR$",
- "(?i)^The \"?Causes\"? of Cancer$",
- "^Normas para la publicación de artículos$",
- "^Editor('|s)(s|') [Rr]eply$",
- "^Editor(’|s)(s|’) letter$",
- "^Redaktoriaus žodis$",
- "^Kōtō shōgaku shūshinsho jidōyō$",
- "^Shōgaku nihon rekishi$",
- "^(Theory of the flow of action currents in isolated myelinated nerve fibers).*$",
- "^Préface$",
- "^Occupational [Hh]ealth [Ss]ervices.$",
- "^In Memoriam Professor Toshiyuki TAKESHIMA$",
- "^Účetní závěrka ve vybraném podniku.*$",
- "^Financial statements in selected company$",
- "^Abdominal [Aa]ortic [Aa]neurysms.*$",
- "^Pseudomyxoma peritonei$",
- "^Kazalo autora$",
- "(?i)^uvodna riječ$",
- "^Motivace jako způsob vedení lidí$",
- "^Motivation as a leadership$",
- "^Polyfunkční dům$",
- "^Multi\\-funkcional building$",
- "^Podnikatelský plán$",
- "(?i)^Podnikatelský záměr$",
- "(?i)^Business Plan$",
- "^Oceňování nemovitostí$",
- "^Marketingová komunikace$",
- "^Marketing communication$",
- "^Sumario Analítico$",
- "^Riječ uredništva$",
- "^Savjetovanja i priredbe$",
- "^Índice$",
- "^(Starobosanski nadpisi).*$",
- "^Vzdělávání pracovníků v organizaci$",
- "^Staff training in organization$",
- "^(Life Histories of North American Geometridae).*$",
- "^Strategická analýza podniku$",
- "^Strategic Analysis of an Enterprise$",
- "^Sadržaj$",
- "^Upute suradnicima$",
- "^Rodinný dům$",
- "(?i)^Fami(l)?ly house$",
- "^Upute autorima$",
- "^Strategic Analysis$",
- "^Finanční analýza vybraného podniku$",
- "^Finanční analýza$",
- "^Riječ urednika$",
- "(?i)^Content(s?)$",
- "(?i)^Inhalt$",
- "^Jinjō shōgaku shūshinsho jidōyō$",
- "(?i)^Index$",
- "^Chūgaku kokubun kyōkasho$",
- "^Retrato de una mujer$",
- "^Retrato de un hombre$",
- "^Kōtō shōgaku dokuhon$",
- "^Shotōka kokugo$",
- "^Shōgaku dokuhon$",
- "^Jinjō shōgaku kokugo dokuhon$",
- "^Shinsei kokugo dokuhon$",
- "^Teikoku dokuhon$",
- "^Instructions to Authors$",
- "(?i)^Presentación$",
- "^İçindekiler$",
- "(?i)^Tabl?e of contents$",
- "^Editorial( Board)?$",
- "(?i)^Editorial \\(English\\)$",
- "^Editörden$",
- "^(Corpus Oral Dialectal \\(COD\\)\\.).*$",
- "^(Kiri Karl Morgensternile).*$",
- "^(\\[Eksliibris Aleksandr).*\\]$",
- "^(\\[Eksliibris Aleksandr).*$",
- "^(Eksliibris Aleksandr).*$",
- "^(Kiri A\\. de Vignolles).*$",
- "^(2 kirja Karl Morgensternile).*$",
- "^(Pirita kloostri idaosa arheoloogilised).*$",
- "^(Kiri tundmatule).*$",
- "^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$",
- "^(Eksliibris Nikolai Birukovile).*$",
- "^(Eksliibris Nikolai Issakovile).*$",
- "^(WHP Cruise Summary Information of section).*$",
- "^(Measurement of the top quark\\-pair production cross section with ATLAS in pp collisions at).*$",
- "^(Measurement of the spin\\-dependent structure function).*",
- "(?i)^.*authors['’′]? reply\\.?$",
- "(?i)^.*authors['’′]? response\\.?$"
- ]
- },
- "synonyms": {}
- }
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml
deleted file mode 100644
index 188e0debcf..0000000000
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml
+++ /dev/null
@@ -1,79 +0,0 @@
- dhp-workflows
- eu.dnetlib.dhp
- 1.2.4-SNAPSHOT
- 4.0.0
- dhp-graph-provision-scholexplorer
- net.alchim31.maven
- scala-maven-plugin
- 4.0.1
- scala-compile-first
- initialize
- add-source
- compile
- scala-test-compile
- process-test-resources
- testCompile
- ${scala.version}
- org.apache.spark
- spark-core_2.11
- org.apache.spark
- spark-sql_2.11
- eu.dnetlib.dhp
- dhp-common
- ${project.version}
- org.apache.httpcomponents
- httpmime
- org.elasticsearch
- elasticsearch-hadoop
- org.apache.httpcomponents
- httpclient
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala
deleted file mode 100644
index b71b7f0549..0000000000
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala
+++ /dev/null
@@ -1,425 +0,0 @@
-package eu.dnetlib.dhp.export
-import com.fasterxml.jackson.databind.ObjectMapper
-import java.time.LocalDateTime
-import java.time.format.DateTimeFormatter
-import eu.dnetlib.dhp.common.PacePerson
-import eu.dnetlib.dhp.schema.action.AtomicAction
-import eu.dnetlib.dhp.schema.common.ModelConstants
-import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty}
-import eu.dnetlib.dhp.utils.DHPUtils
-import org.apache.commons.lang3.StringUtils
-import scala.collection.JavaConverters._
-case class DLIExternalReference(id: String, url: String, sitename: String, label: String, pid: String, classId: String) {}
-object DLIToOAF {
- val collectedFromMap: Map[String, KeyValue] = Map(
- "dli_________::r3d100010527" -> generateKeyValue("10|re3data_____::c2a591f440598b63d854556beaf01591", "European Nucleotide Archive"),
- "dli_________::r3d100010255" -> generateKeyValue("10|re3data_____::480d275ed6f9666ee76d6a1215eabf26", "Inter-university Consortium for Political and Social Research"),
- "dli_________::r3d100011868" -> generateKeyValue("10|re3data_____::db814dc656a911b556dba42a331cebe9", "Mendeley Data"),
- "dli_________::elsevier" -> generateKeyValue("10|openaire____::8f87e10869299a5fe80b315695296b88", "Elsevier"),
- "dli_________::openaire" -> generateKeyValue("10|infrastruct_::f66f1bd369679b5b077dcdf006089556", "OpenAIRE"),
- "dli_________::thomsonreuters" -> generateKeyValue("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2", "Crossref"),
- "dli_________::r3d100010216" -> generateKeyValue("10|re3data_____::0fd79429de04343dbbec705d9b5f429f", "4TU.Centre for Research Data"),
- "dli_________::r3d100010134" -> generateKeyValue("10|re3data_____::9633d1e8c4309c833c2c442abeb0cfeb", "PANGAEA"),
- "dli_________::ieee" -> generateKeyValue("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2", "Crossref"),
- "dli_________::r3d100010197" -> generateKeyValue("10|re3data_____::9fd1d79973f7fda60cbe1d82e3819a68", "The Cambridge Structural Database"),
- "dli_________::nature" -> generateKeyValue("10|openaire____::6e380d9cf51138baec8480f5a0ce3a2e", "Springer Nature"),
- "dli_________::datacite" -> generateKeyValue("10|openaire____::9e3be59865b2c1c335d32dae2fe7b254", "Datacite"),
- "dli_________::r3d100010578" -> generateKeyValue("10|re3data_____::c4d751f29a7568011a4c80136b30b444", "IEDA"),
- "dli_________::r3d100010464" -> generateKeyValue("10|re3data_____::23e2a81591099828f6b83a1c83150666", "Research Data Australia"),
- "dli_________::r3d100010327" -> generateKeyValue("10|re3data_____::a644620b81135243dc9acc15d2362246", "Worldwide Protein Data Bank"),
- "dli_________::pubmed" -> generateKeyValue("10|opendoar____::eda80a3d5b344bc40f3bc04f65b7a357", "PubMed Central"),
- "dli_________::europe_pmc__" -> generateKeyValue("10|opendoar____::8b6dd7db9af49e67306feb59a8bdc52c", "Europe PubMed Central"),
- "dli_________::crossref" -> generateKeyValue("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2", "Crossref")
- )
- val relationTypeMapping: Map[String, (String, String)] = Map(
- "IsReferencedBy" -> (ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
- "References" -> (ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
- "IsRelatedTo" -> (ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
- "IsSupplementedBy" -> (ModelConstants.IS_SUPPLEMENTED_BY, ModelConstants.SUPPLEMENT),
- "Documents" -> (ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
- "Cites" -> (ModelConstants.CITES, ModelConstants.CITATION),
- "Unknown" -> (ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
- "IsSourceOf" -> (ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
- "IsCitedBy" -> (ModelConstants.IS_CITED_BY, ModelConstants.CITATION),
- "Reviews" -> (ModelConstants.REVIEWS, ModelConstants.REVIEW),
- "Describes" -> (ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
- "HasAssociationWith" -> (ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)
- )
- val expectecdPidType = List("uniprot", "ena", "chembl", "ncbi-n", "ncbi-p", "genbank", "pdb", "url")
- val filteredURL = List(
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- "",
- ""
- )
- val rel_inverse: Map[String, String] = Map(
- ModelConstants.IS_RELATED_TO -> ModelConstants.IS_RELATED_TO,
- ModelConstants.IS_SUPPLEMENTED_BY -> ModelConstants.IS_SUPPLEMENT_TO,
- ModelConstants.CITES -> ModelConstants.IS_CITED_BY,
- ModelConstants.IS_CITED_BY -> ModelConstants.CITES,
- ModelConstants.REVIEWS -> ModelConstants.IS_REVIEWED_BY
- )
- val PidTypeMap: Map[String, String] = Map(
- "pbmid" -> "pmid",
- "pmcid" -> "pmc",
- "pmid" -> "pmid",
- "pubmedid" -> "pmid",
- "DOI" -> "doi",
- "doi" -> "doi"
- )
- def fixInstance(r:Publication) :Publication = {
- val collectedFrom = r.getCollectedfrom.asScala.head
- r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom))
- r
- }
- def fixInstanceDataset(r:Dataset) :Dataset = {
- val collectedFrom = r.getCollectedfrom.asScala.head
- r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom))
- r
- }
- def toActionSet(item: Oaf): (String, String) = {
- val mapper = new ObjectMapper()
- item match {
- case dataset: Dataset =>
- val a: AtomicAction[Dataset] = new AtomicAction[Dataset]
- a.setClazz(classOf[Dataset])
- a.setPayload(dataset)
- (dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
- case publication: Publication =>
- val a: AtomicAction[Publication] = new AtomicAction[Publication]
- a.setClazz(classOf[Publication])
- a.setPayload(publication)
- (publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
- case relation: Relation =>
- val a: AtomicAction[Relation] = new AtomicAction[Relation]
- a.setClazz(classOf[Relation])
- a.setPayload(relation)
- (relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
- case _ =>
- null
- }
- }
- def convertClinicalTrial(dataset: DLIDataset): (String, String) = {
- val currentId = generateId(dataset.getId)
- val pids = dataset.getPid.asScala.filter(p => "".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => s"50|r3111dacbab5::${DHPUtils.md5(p.getValue.toLowerCase())}")
- if (pids.isEmpty)
- null
- else
- (currentId, pids.head)
- }
- def insertExternalRefs(publication: Publication, externalReferences: List[DLIExternalReference]): Publication = {
- val eRefs = => {
- val result = new ExternalReference()
- result.setSitename(e.sitename)
- result.setLabel(e.label)
- result.setUrl(e.url)
- result.setRefidentifier(
- result.setDataInfo(generateDataInfo())
- result.setQualifier(createQualifier(e.classId, ModelConstants.DNET_EXTERNAL_REFERENCE_TYPE))
- result
- })
- publication.setExternalReference(eRefs.asJava)
- publication
- }
- def filterPid(p: StructuredProperty): Boolean = {
- if (expectecdPidType.contains(p.getQualifier.getClassname) && p.getQualifier.getClassname.equalsIgnoreCase("url"))
- if (filteredURL.exists(u => p.getValue.contains(u)))
- return true
- else
- return false
- expectecdPidType.contains(p.getQualifier.getClassname)
- }
- def extractTitle(titles: java.util.List[StructuredProperty]): String = {
- if (titles == null)
- return null
- val label = => p.getValue).find(p => p.nonEmpty)
- label.orNull
- }
- def convertDLIDatasetToExternalReference(dataset: DLIDataset): DLIExternalReference = {
- val pids = dataset.getPid.asScala.filter(filterPid)
- if (pids == null || pids.isEmpty)
- return null
- val pid: StructuredProperty = pids.head
- pid.getQualifier.getClassname match {
- case "uniprot" => DLIExternalReference(generateId(dataset.getId), s"${pid.getValue}", "UniProt", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber")
- case "ena" =>
- if (pid.getValue != null && pid.getValue.nonEmpty && pid.getValue.length > 7)
- DLIExternalReference(generateId(dataset.getId), s"${pid.getValue.substring(0, 8)}", "European Nucleotide Archive", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber")
- else
- null
- case "chembl" => DLIExternalReference(generateId(dataset.getId), s"${pid.getValue}", "ChEMBL", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber")
- case "ncbi-n" => DLIExternalReference(generateId(dataset.getId), s"${pid.getValue}", "Nucleotide Database", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber")
- case "ncbi-p" => DLIExternalReference(generateId(dataset.getId), s"${pid.getValue}", "Nucleotide Database", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber")
- case "genbank" => DLIExternalReference(generateId(dataset.getId), s"${pid.getValue}", "GenBank", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber")
- case "pdb" => DLIExternalReference(generateId(dataset.getId), s"${pid.getValue}", "Protein Data Bank", extractTitle(dataset.getTitle), pid.getValue, "accessionNumber")
- case "url" => DLIExternalReference(generateId(dataset.getId), pid.getValue, "", extractTitle(dataset.getTitle), pid.getValue, "url")
- }
- }
- def convertDLIPublicationToOAF(inputPublication: DLIPublication): Publication = {
- val result = new Publication
- val cleanedPids = inputPublication.getPid.asScala.filter(p => PidTypeMap.contains(p.getQualifier.getClassid))
- .map(p => {
- p.setQualifier(createQualifier(PidTypeMap(p.getQualifier.getClassid), p.getQualifier.getSchemeid))
- p
- })
- if (cleanedPids.isEmpty)
- return null
- result.setId(generateId(inputPublication.getId))
- result.setDataInfo(generateDataInfo(invisible = true))
- if (inputPublication.getCollectedfrom == null || inputPublication.getCollectedfrom.size() == 0 || (inputPublication.getCollectedfrom.size() == 1 && inputPublication.getCollectedfrom.get(0) == null))
- return null
- result.setCollectedfrom( => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
- if(result.getCollectedfrom.isEmpty)
- return null
- result.setPid(cleanedPids.asJava)
- result.setDateofcollection(inputPublication.getDateofcollection)
- result.setOriginalId( => p.getValue).asJava)
- result.setDateoftransformation("yyyy-MM-dd'T'HH:mm:ss'Z'")))
- if (inputPublication.getAuthor == null || inputPublication.getAuthor.isEmpty)
- return null
- result.setAuthor(
- result.setResulttype(createQualifier(inputPublication.getResulttype.getClassid, inputPublication.getResulttype.getClassname, ModelConstants.DNET_RESULT_TYPOLOGIES, ModelConstants.DNET_RESULT_TYPOLOGIES))
- if (inputPublication.getSubject != null)
- result.setSubject(
- if (inputPublication.getTitle == null || inputPublication.getTitle.isEmpty)
- return null
- result.setTitle(List(patchTitle(inputPublication.getTitle.get(0))).asJava)
- if (inputPublication.getRelevantdate == null || inputPublication.getRelevantdate.size() == 0)
- return null
- result.setRelevantdate(
- result.setDescription(inputPublication.getDescription)
- result.setDateofacceptance(asField(inputPublication.getRelevantdate.get(0).getValue))
- result.setPublisher(inputPublication.getPublisher)
- result.setSource(inputPublication.getSource)
- result.setBestaccessright(createAccessRight(ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE, ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES))
- val dois = result.getPid.asScala.filter(p => "doi".equalsIgnoreCase(p.getQualifier.getClassname)).map(p => p.getValue)
- if (dois.isEmpty)
- return null
- val i: Instance = createInstance(s"${dois.head}", firstInstanceOrNull(inputPublication.getInstance()), result.getDateofacceptance)
- if (i != null)
- result.setInstance(List(i).asJava)
- result
- }
- def convertDLIRelation(r: Relation): Relation = {
- val rt = r.getRelType
- if (!relationTypeMapping.contains(rt))
- return null
- r.setRelType(ModelConstants.RESULT_RESULT)
- r.setRelClass(relationTypeMapping(rt)._1)
- r.setSubRelType(relationTypeMapping(rt)._2)
- r.setSource(generateId(r.getSource))
- r.setTarget(generateId(r.getTarget))
- r
- }
- def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = {
- if (d.getCollectedfrom == null || d.getCollectedfrom.size() == 0 || (d.getCollectedfrom.size() == 1 && d.getCollectedfrom.get(0) == null))
- return null
- val result: Dataset = new Dataset
- result.setId(generateId(d.getId))
- result.setDataInfo(generateDataInfo())
- result.setCollectedfrom( => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
- if(result.getCollectedfrom.isEmpty)
- return null
- result.setPid(d.getPid)
- val fpids = result.getPid.asScala.filter(p => "doi".equalsIgnoreCase(p.getQualifier.getClassname) ||
- "pdb".equalsIgnoreCase(p.getQualifier.getClassname)
- ).map(p => p.getValue)
- if (fpids == null || fpids.isEmpty)
- return null
- result.setDateofcollection(d.getDateofcollection)
- result.setOriginalId( => d.getValue).asJava)
- result.setDateoftransformation("yyyy-MM-dd'T'HH:mm:ss'Z'")))
- if (d.getAuthor == null || d.getAuthor.isEmpty)
- return null
- result.setAuthor(
- result.setResulttype(createQualifier(d.getResulttype.getClassid, d.getResulttype.getClassname, ModelConstants.DNET_RESULT_TYPOLOGIES, ModelConstants.DNET_RESULT_TYPOLOGIES))
- if (d.getSubject != null)
- result.setSubject(
- if (d.getTitle == null || d.getTitle.isEmpty)
- return null
- result.setTitle(List(patchTitle(d.getTitle.get(0))).asJava)
- if (d.getRelevantdate == null || d.getRelevantdate.size() == 0)
- return null
- result.setRelevantdate(
- result.setDescription(d.getDescription)
- result.setDateofacceptance(asField(d.getRelevantdate.get(0).getValue))
- result.setPublisher(d.getPublisher)
- result.setSource(d.getSource)
- result.setBestaccessright(createAccessRight(ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE, ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES))
- val instance_urls = if (fpids.head.length < 5) s"${fpids.head}" else s"${fpids.head}"
- val i: Instance = createInstance(instance_urls, firstInstanceOrNull(d.getInstance()), result.getDateofacceptance, true)
- // Ticket #6281 added pid to Instance
- i.setPid(result.getPid)
- if (i != null)
- result.setInstance(List(i).asJava)
- result
- }
- def firstInstanceOrNull(instances: java.util.List[Instance]): Instance = {
- if (instances == null || instances.size() == 0)
- return null
- instances.get(0)
- }
- def createInstance(url: String, originalInstance: Instance, doa: Field[String], dataset: Boolean = false): Instance = {
- val i = new Instance
- i.setUrl(List(url).asJava)
- if (dataset)
- i.setInstancetype(createQualifier("0021", "Dataset", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))
- else
- i.setInstancetype(createQualifier("0000", "Unknown", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))
- if (originalInstance != null && originalInstance.getHostedby != null)
- i.setHostedby(originalInstance.getHostedby)
- i.setAccessright(createAccessRight(ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE, ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES))
- i.setDateofacceptance(doa)
- i
- }
- def patchRelevantDate(d: StructuredProperty): StructuredProperty = {
- d.setQualifier(createQualifier(ModelConstants.UNKNOWN, ModelConstants.DNET_DATACITE_DATE))
- d
- }
- def patchTitle(t: StructuredProperty): StructuredProperty = {
- t.setQualifier(ModelConstants.MAIN_TITLE_QUALIFIER)
- t
- }
- def convertSubject(s: StructuredProperty): StructuredProperty = {
- s.setQualifier(createQualifier("keyword", ModelConstants.DNET_SUBJECT_TYPOLOGIES))
- s
- }
- def convertAuthor(a: Author): Author = {
- if (a == null)
- return a
- val p = new PacePerson(a.getFullname, false)
- if (p.isAccurate) {
- a.setName(p.getNameString)
- a.setSurname(p.getSurnameString)
- }
- a
- }
- def generateId(id: String): String = {
- val md5 = if (id.contains("::")) StringUtils.substringAfter(id, "::") else StringUtils.substringAfter(id, "|")
- s"50|scholix_____::$md5"
- }
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala
deleted file mode 100644
index 3f632af226..0000000000
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-package eu.dnetlib.dhp.`export`
-import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
-import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.expressions.Window
-import org.apache.spark.SparkConf
-import scala.collection.mutable.ArrayBuffer
-object SparkExportContentForOpenAire {
- def main(args: Array[String]): Unit = {
- val conf: SparkConf = new SparkConf()
- val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExportContentForOpenAire.getClass.getResourceAsStream("input_export_content_parameters.json")))
- parser.parseArgument(args)
- val spark: SparkSession =
- SparkSession
- .builder()
- .config(conf)
- .appName(SparkExportContentForOpenAire.getClass.getSimpleName)
- .master(parser.get("master")).getOrCreate()
- val workingPath = parser.get("workingDirPath")
- implicit val dliPubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
- implicit val dliDatEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
- implicit val pubEncoder: Encoder[Publication] = Encoders.bean(classOf[Publication])
- implicit val datEncoder: Encoder[OafDataset] = Encoders.bean(classOf[OafDataset])
- implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
- import spark.implicits._
- val dsRel ="$workingPath/relation_b").as[Relation]
- dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false)
- .map(DLIToOAF.convertDLIRelation)
- .filter(r => r!= null)
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
- val dsPubs ="$workingPath/publication").as[DLIPublication]
- dsPubs
- .filter(p=>p.getDataInfo.getDeletedbyinference == false)
- .map(DLIToOAF.convertDLIPublicationToOAF)
- .filter(p=>p!= null)
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationDS")
- val dsDataset ="$workingPath/dataset").as[DLIDataset]
- dsDataset
- .filter(p => p.getDataInfo.getDeletedbyinference == false)
- .map(DLIToOAF.convertDLIDatasetTOOAF).filter(p=>p!= null)
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetDS")
- val pubs:Dataset[Publication] ="$workingPath/export/publicationDS").as[Publication]
- val dats :Dataset[OafDataset] ="$workingPath/export/datasetDS").as[OafDataset]
- val relDS1 :Dataset[Relation] ="$workingPath/export/relationDS").as[Relation]
- val pub_id ="id").distinct()
- val dat_id ="id").distinct()
- pub_id.joinWith(relDS1, pub_id("id").equalTo(relDS1("source"))).map(k => k._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS_f1")
- val relDS2="$workingPath/export/relationDS_f1").as[Relation]
- relDS2.joinWith(dat_id, relDS2("target").equalTo(dats("id"))).map(k => k._1).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS_filtered")
- val r_source ="source")).distinct()
- val r_target ="target")).distinct()
- val w2 = Window.partitionBy("id").orderBy("lastupdatetimestamp")
- pubs.joinWith(r_source, pubs("id").equalTo(r_source("source")), "inner").map(k => k._1)
- .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row")
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationDS_filtered")
- dats.joinWith(r_target, dats("id").equalTo(r_target("target")), "inner").map(k => k._1)
- .withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row")
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/datasetAS")
- => p != null).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/externalReference")
- val pf ="$workingPath/export/publicationDS_filtered").select("id")
- val relDS3 ="$workingPath/export/relationDS").as[Relation]
- val relationTo = pf.joinWith(relDS3, pf("id").equalTo(relDS3("source")),"inner").map(t =>t._2)
- val extRef ="$workingPath/export/externalReference").as[DLIExternalReference]
- spark.createDataset(relationTo.joinWith(extRef, relationTo("target").equalTo(extRef("id")), "inner").map(d => {
- val r = d._1
- val ext = d._2
- (r.getSource, ext)
- }) => {
- var dli_ext = ArrayBuffer[DLIExternalReference]()
- f._2.foreach(d => if (dli_ext.size < 100) dli_ext += d )
- (f._1, dli_ext)
- })).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/externalReference_grouped")
- val pubf :Dataset[Publication] ="$workingPath/export/publicationDS_filtered").as[Publication]
- val groupedERf:Dataset[(String, List[DLIExternalReference])]="$workingPath/export/externalReference_grouped").as[(String, List[DLIExternalReference])]
- groupedERf.joinWith(pubf,pubf("id").equalTo(groupedERf("_1"))).map(t =>
- {
- val publication = t._2
- if (t._1 != null) {
- val eRefs = t._1._2
- DLIToOAF.insertExternalRefs(publication, eRefs)
- } else
- publication
- }
- ).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/publicationAS")
- dsDataset
- .map(DLIToOAF.convertClinicalTrial)
- .filter(p => p != null)
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/export/clinicalTrials")
- val ct:Dataset[(String,String)] ="$workingPath/export/clinicalTrials").as[(String,String)]
- val relDS="$workingPath/export/relationDS_f1").as[Relation]
- relDS.joinWith(ct, relDS("target").equalTo(ct("_1")), "inner")
- .map(k =>{
- val currentRel = k._1
- currentRel.setTarget(k._2._2)
- currentRel
- }).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/clinicalTrialsRels")
- val clRels:Dataset[Relation] ="$workingPath/export/clinicalTrialsRels").as[Relation]
- val rels:Dataset[Relation] ="$workingPath/export/relationDS_filtered").as[Relation]
- rels.union(clRels).flatMap(r => {
- val inverseRel = new Relation
- inverseRel.setSource(r.getTarget)
- inverseRel.setTarget(r.getSource)
- inverseRel.setDataInfo(r.getDataInfo)
- inverseRel.setCollectedfrom(r.getCollectedfrom)
- inverseRel.setRelType(r.getRelType)
- inverseRel.setSubRelType(r.getSubRelType)
- inverseRel.setRelClass(DLIToOAF.rel_inverse(r.getRelClass))
- List(r, inverseRel)
- }).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationAS")
- val fRels:Dataset[(String,String)] ="$workingPath/export/relationAS").as[Relation].map(DLIToOAF.toActionSet)
- val fpubs:Dataset[(String,String)] ="$workingPath/export/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet)
- val fdats:Dataset[(String,String)] ="$workingPath/export/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet)
- fRels.union(fpubs).union(fdats) => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/export/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
- }
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/
deleted file mode 100644
index e19432f291..0000000000
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/
+++ /dev/null
@@ -1,112 +0,0 @@
-package eu.dnetlib.dhp.export.zenodo;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.common.MakeTarArchive;
-public class MakeTar implements Serializable {
- private static final Logger log = LoggerFactory.getLogger(MakeTar.class);
- public static void main(String[] args) throws Exception {
- String jsonConfiguration = IOUtils
- .toString(
- MakeTar.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/export/input_maketar_parameters.json"));
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
- parser.parseArgument(args);
- final String outputPath = parser.get("targetPath");
-"hdfsPath: {}", outputPath);
