diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index e5c8a4606..47aab1d20 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -25,6 +25,7 @@ public class DedupRecordFactory { public static Dataset createDedupRecord( final SparkSession spark, + final DataInfo dataInfo, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz) { @@ -67,41 +68,39 @@ public class DedupRecordFactory { Encoders.STRING()) .mapGroups( (MapGroupsFunction, T>) - (key, values) -> entityMerger(key, values, ts, clazz), + (key, values) -> entityMerger(key, values, ts, dataInfo), Encoders.bean(clazz)); } private static T entityMerger( - String id, Iterator> entities, long ts, Class clazz) { - try { - T entity = clazz.newInstance(); - entity.setId(id); - entity.setDataInfo(new DataInfo()); - entity.getDataInfo().setTrust("0.9"); - entity.setLastupdatetimestamp(ts); + String id, Iterator> entities, long ts, DataInfo dataInfo) { - final Collection dates = Lists.newArrayList(); - entities.forEachRemaining( - t -> { - T duplicate = t._2(); - entity.mergeFrom(duplicate); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result r1 = (Result) duplicate; - Result er = (Result) entity; - er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); + T entity = entities.next()._2(); - if (er.getDateofacceptance() != null) { - dates.add(r1.getDateofacceptance().getValue()); - } + final Collection dates = Lists.newArrayList(); + entities.forEachRemaining( + t -> { + T duplicate = t._2(); + entity.mergeFrom(duplicate); + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result r1 = (Result) duplicate; + Result er = (Result) entity; + er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); + + if (r1.getDateofacceptance() != null) { + dates.add(r1.getDateofacceptance().getValue()); } - }); + } + }); - if (ModelSupport.isSubClass(entity, Result.class)) { - ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); - } - return entity; - } catch (IllegalAccessException | InstantiationException e) { - throw new RuntimeException(e); + if (ModelSupport.isSubClass(entity, Result.class)) { + ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); } + + entity.setId(id); + entity.setLastupdatetimestamp(ts); + entity.setDataInfo(dataInfo); + + return entity; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index c46464ffd..42a0cff8a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.dedup; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -21,6 +23,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); + public static final String ROOT_TRUST = "0.8"; + public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; + public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions"; + public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -67,13 +73,30 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); - - DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz) + final Class clazz = + ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + final DataInfo dataInfo = getDataInfo(dedupConf); + DedupRecordFactory.createDedupRecord(spark, dataInfo, mergeRelPath, entityPath, clazz) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } } + + private static DataInfo getDataInfo(DedupConfig dedupConf) { + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setTrust(ROOT_TRUST); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenance = new Qualifier(); + provenance.setClassid(PROVENANCE_ACTION_CLASS); + provenance.setClassname(PROVENANCE_ACTION_CLASS); + provenance.setSchemeid(PROVENANCE_ACTIONS); + provenance.setSchemename(PROVENANCE_ACTIONS); + info.setProvenanceaction(provenance); + return info; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index 4baac0229..b89a0e7e2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable { if (docIds.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; - ccId = prefix + "|dedup_______::" + DedupUtility.md5(s); + ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s); return ccId; } else { return docIds.iterator().next(); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java index 41d53944f..a5aa94e09 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java @@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable { if (docIds.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; - ccId = prefix + "|dedup_______::" + DedupUtility.md5(s); + ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s); return ccId; } else { return docIds.iterator().next(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java index d0fe95289..bec3810f9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java @@ -19,6 +19,8 @@ public class GraphHiveImporterJob { private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = @@ -37,12 +39,12 @@ public class GraphHiveImporterJob { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - String hiveMetastoreUris = parser.get("hiveMetastoreUris"); - log.info("hiveMetastoreUris: {}", hiveMetastoreUris); - String hiveDbName = parser.get("hiveDbName"); log.info("hiveDbName: {}", hiveDbName); + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", hiveMetastoreUris); @@ -58,13 +60,13 @@ public class GraphHiveImporterJob { spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); // Read the input file and convert it into RDD of serializable object ModelSupport.oafTypes.forEach( (name, clazz) -> spark.createDataset( sc.textFile(inputPath + "/" + name) - .map(s -> new ObjectMapper().readValue(s, clazz)) + .map(s -> OBJECT_MAPPER.readValue(s, clazz)) .rdd(), Encoders.bean(clazz)) .write() diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml index 8d8766283..9608732ed 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml @@ -12,19 +12,15 @@ true - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris + hiveMetastoreUris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - hive_jdbc_url + hiveJdbcUrl jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 - hive_db_name + hiveDbName openaire \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql index c92f8d1af..6c49679cd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql @@ -1,10 +1,10 @@ -DROP VIEW IF EXISTS ${hive_db_name}.result; +DROP VIEW IF EXISTS ${hiveDbName}.result; CREATE VIEW IF NOT EXISTS result as - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.publication p union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.dataset d union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.software s union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o; + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.otherresearchproduct o; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml index 67ca6a64a..dc1fa092d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml @@ -2,13 +2,21 @@ - sourcePath + inputPath the source path - hive_db_name + hiveDbName the target hive database name + + hiveJdbcUrl + hive server jdbc url + + + hiveMetastoreUris + hive server metastore URIs + sparkDriverMemory memory for driver process @@ -87,9 +95,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} - --hive_db_name${hive_db_name} - --hive_metastore_uris${hive_metastore_uris} + --inputPath${inputPath} + --hiveDbName${hiveDbName} + --hiveMetastoreUris${hiveMetastoreUris} @@ -102,12 +110,12 @@ hive.metastore.uris - ${hive_metastore_uris} + ${hiveMetastoreUris} - ${hive_jdbc_url}/${hive_db_name} + ${hiveJdbcUrl}/${hiveDbName} - hive_db_name=${hive_db_name} + hiveDbName=${hiveDbName} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 20786582f..eaa18ad0b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -216,6 +216,7 @@ public class CreateRelatedEntitiesJob_phase2 { (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) + .filter("dataInfo.invisible == false") .map( (MapFunction) value -> diff --git a/pom.xml b/pom.xml index f4dfc2c0d..25e84a424 100644 --- a/pom.xml +++ b/pom.xml @@ -292,6 +292,12 @@ eu.dnetlib dnet-actionmanager-common 6.0.5 + + + org.apache.hadoop + hadoop-common + + eu.dnetlib