From b35c59eb42465d6e7dd9981e7f745d4a7eeb5dcb Mon Sep 17 00:00:00 2001 From: Michele Artini Date: Mon, 20 Jan 2020 16:04:19 +0100 Subject: [PATCH] partial implementation of entities from db --- .../dhp-build-properties-maven-plugin/pom.xml | 35 + .../eu/dnetlib/dhp/schema/dli/Relation.java | 61 +- .../MigrateDbEntitiesApplication.java | 255 ++++-- dhp-workflows/dhp-dedup/pom.xml | 31 + .../dnetlib/dhp/graph/GraphMappingUtils.java | 36 +- .../dhp/graph/SparkGraphImporterJob.java | 63 +- pom.xml | 759 +++++++++--------- 7 files changed, 702 insertions(+), 538 deletions(-) diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml index 4f99d5298..7b50acd3d 100644 --- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml +++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml @@ -76,6 +76,41 @@ + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + + org.apache.maven.plugins + + + maven-plugin-plugin + + + [3.2,) + + + descriptor + + + + + + + + + + + + diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java index b83cccb73..66007e21d 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dli/Relation.java @@ -5,43 +5,48 @@ import java.util.List; public class Relation implements Serializable { - private String source; + /** + * + */ + private static final long serialVersionUID = -9103706796710618813L; - private String target; + private String source; - private List provenance; + private String target; - private RelationSemantic semantic; + private List provenance; - public String getSource() { - return source; - } + private RelationSemantic semantic; - public void setSource(String source) { - this.source = source; - } + public String getSource() { + return source; + } - public String getTarget() { - return target; - } + public void setSource(final String source) { + this.source = source; + } - public void setTarget(String target) { - this.target = target; - } + public String getTarget() { + return target; + } - public List getProvenance() { - return provenance; - } + public void setTarget(final String target) { + this.target = target; + } - public void setProvenance(List provenance) { - this.provenance = provenance; - } + public List getProvenance() { + return provenance; + } - public RelationSemantic getSemantic() { - return semantic; - } + public void setProvenance(final List provenance) { + this.provenance = provenance; + } - public void setSemantic(RelationSemantic semantic) { - this.semantic = semantic; - } + public RelationSemantic getSemantic() { + return semantic; + } + + public void setSemantic(final RelationSemantic semantic) { + this.semantic = semantic; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java index 60a7c24f7..efc395812 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java @@ -2,11 +2,17 @@ package eu.dnetlib.dhp.migration; import java.io.Closeable; import java.io.IOException; +import java.sql.Array; import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; +import java.util.List; import java.util.function.Consumer; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -15,14 +21,21 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class MigrateDbEntitiesApplication extends AbstractMigrateApplication implements Closeable { + private static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION = MigrationUtils + .qualifier("sysimport:crosswalk:entityregistry", "sysimport:crosswalk:entityregistry", "dnet:provenance_actions", "dnet:provenance_actions"); + private static final Log log = LogFactory.getLog(MigrateDbEntitiesApplication.class); private final DbClient dbClient; + private final long lastUpdateTimestamp; + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString(MigrateDbEntitiesApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json"))); @@ -51,6 +64,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp final String dbPassword) throws Exception { super(hdfsPath, hdfsNameNode, hdfsUser); this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + this.lastUpdateTimestamp = new Date().getTime(); } public void execute(final String sqlFile, final Consumer consumer) throws Exception { @@ -61,7 +75,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp public void processDatasource(final ResultSet rs) { try { - final DataInfo info = MigrationUtils.dataInfo(null, null, null, null, null, null); // TODO + final DataInfo info = prepareDataInfo(rs); final Datasource ds = new Datasource(); @@ -74,8 +88,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp ds.setExtraInfo(null); // TODO ds.setOaiprovenance(null); // TODO - ds.setDatasourcetype(null); // Qualifier datasourcetype) { - ds.setOpenairecompatibility(null); // Qualifier openairecompatibility) { + ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype"))); + ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility"))); ds.setOfficialname(MigrationUtils.field(rs.getString("officialname"), info)); ds.setEnglishname(MigrationUtils.field(rs.getString("englishname"), info)); ds.setWebsiteurl(MigrationUtils.field(rs.getString("websiteurl"), info)); @@ -86,7 +100,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp ds.setLongitude(MigrationUtils.field(Double.toString(rs.getDouble("longitude")), info)); ds.setDateofvalidation(MigrationUtils.field(rs.getDate("dateofvalidation").toString(), info)); ds.setDescription(MigrationUtils.field(rs.getString("description"), info)); - ds.setSubjects(null); // List subjects) { + ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); ds.setOdnumberofitems(MigrationUtils.field(Double.toString(rs.getInt("odnumberofitems")), info)); ds.setOdnumberofitemsdate(MigrationUtils.field(rs.getDate("odnumberofitemsdate").toString(), info)); ds.setOdpolicies(MigrationUtils.field(rs.getString("odpolicies"), info)); @@ -110,12 +124,15 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp ds.setPolicies(null); // List // TODO ds.setJournal(null); // Journal // TODO + ds.setDataInfo(info); + ds.setLastupdatetimestamp(lastUpdateTimestamp); + // rs.getString("datasourceid"); rs.getArray("identities"); // rs.getString("officialname"); // rs.getString("englishname"); // rs.getString("contactemail"); - rs.getString("openairecompatibility"); // COMPLEX ...@@@... + // rs.getString("openairecompatibility"); // COMPLEX ...@@@... // rs.getString("websiteurl"); // rs.getString("logourl"); // rs.getArray("accessinfopackage"); @@ -124,15 +141,15 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp // rs.getString("namespaceprefix"); // rs.getInt("odnumberofitems"); // NULL // rs.getDate("odnumberofitemsdate"); // NULL - rs.getArray("subjects"); + // rs.getArray("subjects"); // rs.getString("description"); // rs.getString("odpolicies"); // NULL // rs.getArray("odlanguages"); // rs.getArray("odcontenttypes"); - rs.getBoolean("inferred"); // false - rs.getBoolean("deletedbyinference");// false - rs.getDouble("trust"); // 0.9 - rs.getString("inferenceprovenance"); // NULL + // rs.getBoolean("inferred"); // false + // rs.getBoolean("deletedbyinference");// false + // rs.getDouble("trust"); // 0.9 + // rs.getString("inferenceprovenance"); // NULL // rs.getDate("dateofcollection"); // rs.getDate("dateofvalidation"); // rs.getDate("releasestartdate"); @@ -152,21 +169,22 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp rs.getArray("policies"); // rs.getString("collectedfromid"); // rs.getString("collectedfromname"); - rs.getString("datasourcetype"); // COMPLEX XXX@@@@.... - rs.getString("provenanceaction"); // 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' - // AS provenanceaction, + // rs.getString("datasourcetype"); // COMPLEX XXX@@@@.... + // rs.getString("provenanceaction"); // + // 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' + // AS provenanceaction, rs.getString("journal"); // CONCAT(d.issn, '@@@', d.eissn, '@@@', d.lissn) AS journal emitOaf(ds); } catch (final Exception e) { - // TODO: handle exception + throw new RuntimeException(e); } } public void processProject(final ResultSet rs) { try { - final DataInfo info = MigrationUtils.dataInfo(null, null, null, null, null, null); // TODO + final DataInfo info = prepareDataInfo(rs); final Project p = new Project(); @@ -192,9 +210,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp p.setEcsc39(MigrationUtils.field(Boolean.toString(rs.getBoolean("ecsc39")), info)); p.setOamandatepublications(MigrationUtils.field(Boolean.toString(rs.getBoolean("oamandatepublications")), info)); p.setEcarticle29_3(MigrationUtils.field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info)); - p.setSubjects(null); // List //TODO + p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); p.setFundingtree(null); // List> //TODO - p.setContracttype(null); // Qualifier //TODO + p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype"))); p.setOptional1(MigrationUtils.field(rs.getString("optional1"), info)); p.setOptional2(MigrationUtils.field(rs.getString("optional2"), info)); p.setJsonextrainfo(MigrationUtils.field(rs.getString("jsonextrainfo"), info)); @@ -207,6 +225,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp p.setTotalcost(new Float(rs.getDouble("totalcost"))); p.setFundedamount(new Float(rs.getDouble("fundedamount"))); + p.setDataInfo(info); + p.setLastupdatetimestamp(lastUpdateTimestamp); + // rs.getString("projectid"); // rs.getString("code"); // rs.getString("websiteurl"); @@ -222,13 +243,13 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp // rs.getBoolean("ecarticle29_3"); // rs.getDate("dateofcollection"); // rs.getDate("dateoftransformation"); - rs.getBoolean("inferred"); - rs.getBoolean("deletedbyinference"); - rs.getDouble("trust"); - rs.getString("inferenceprovenance"); + // rs.getBoolean("inferred"); + // rs.getBoolean("deletedbyinference"); + // rs.getDouble("trust"); + // rs.getString("inferenceprovenance"); // rs.getString("optional1"); // rs.getString("optional2"); - rs.getString("jsonextrainfo"); + // rs.getString("jsonextrainfo"); // rs.getString("contactfullname"); // rs.getString("contactfax"); // rs.getString("contactphone"); @@ -248,14 +269,14 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp emitOaf(p); } catch (final Exception e) { - // TODO: handle exception + throw new RuntimeException(e); } } public void processOrganization(final ResultSet rs) { try { - final DataInfo info = MigrationUtils.dataInfo(null, null, null, null, null, null); // TODO + final DataInfo info = prepareDataInfo(rs); final Organization o = new Organization(); @@ -269,7 +290,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp o.setOaiprovenance(null); // OAIProvenance // TODO o.setLegalshortname(MigrationUtils.field("legalshortname", info)); o.setLegalname(MigrationUtils.field("legalname", info)); - o.setAlternativeNames(null); // List> //TODO + o.setAlternativeNames(new ArrayList<>()); o.setWebsiteurl(MigrationUtils.field("websiteurl", info)); o.setLogourl(MigrationUtils.field("logourl", info)); o.setEclegalbody(MigrationUtils.field(Boolean.toString(rs.getBoolean("eclegalbody")), info)); @@ -283,7 +304,10 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp o.setEcenterprise(MigrationUtils.field(Boolean.toString(rs.getBoolean("ecenterprise")), info)); o.setEcsmevalidated(MigrationUtils.field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info)); o.setEcnutscode(MigrationUtils.field(Boolean.toString(rs.getBoolean("ecnutscode")), info)); - o.setCountry(null); // Qualifier country) { + o.setCountry(prepareQualifierSplitting(rs.getString("country"))); + + o.setDataInfo(info); + o.setLastupdatetimestamp(lastUpdateTimestamp); // rs.getString("organizationid"); // rs.getString("legalshortname"); @@ -300,87 +324,160 @@ public class MigrateDbEntitiesApplication extends AbstractMigrateApplication imp // rs.getBoolean("ecenterprise"); // rs.getBoolean("ecsmevalidated"); // rs.getBoolean("ecnutscode"); - rs.getDate("dateofcollection"); - rs.getDate("dateoftransformation"); - rs.getBoolean("inferred"); - rs.getBoolean("deletedbyinference"); - rs.getDouble("trust"); - rs.getString("inferenceprovenance"); + // rs.getDate("dateofcollection"); + // rs.getDate("dateoftransformation"); + // rs.getBoolean("inferred"); + // rs.getBoolean("deletedbyinference"); + // rs.getDouble("trust"); + // rs.getString("inferenceprovenance"); // rs.getString("collectedfromid"); // rs.getString("collectedfromname"); - rs.getString("country"); + // rs.getString("country"); rs.getString("provenanceaction"); rs.getArray("pid"); emitOaf(o); } catch (final Exception e) { - // TODO: handle exception + throw new RuntimeException(e); } } public void processDatasourceOrganization(final ResultSet rs) { try { - final Relation r = new Relation(); + final DataInfo info = prepareDataInfo(rs); + final String orgId = MigrationUtils.createOpenaireId("20", rs.getString("organization")); + final String dsId = MigrationUtils.createOpenaireId("10", rs.getString("datasource")); - r.setRelType(null); // TODO - r.setSubRelType(null); // TODO - r.setRelClass(null); // TODO - r.setSource(null); // TODO - r.setTarget(null); // TODO - r.setCollectedFrom(MigrationUtils.listKeyValues("", "")); + final Relation r1 = new Relation(); + r1.setRelType("datasourceOrganization"); + r1.setSubRelType("provision"); + r1.setRelClass("isProvidedBy"); + r1.setSource(dsId); + r1.setTarget(orgId); + r1.setCollectedFrom(null);// TODO + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r1); - rs.getString("datasource"); - rs.getString("organization"); - rs.getDate("startdate"); // NULL - rs.getDate("enddate"); // NULL - rs.getBoolean("inferred"); // false - rs.getBoolean("deletedbyinference"); // false - rs.getDouble("trust"); // 0.9 - rs.getString("inferenceprovenance"); // NULL - rs.getString("semantics"); // 'providedBy@@@provided - // by@@@dnet:datasources_organizations_typologies@@@dnet:datasources_organizations_typologies' AS - // semantics, - rs.getString("provenanceaction"); // d.provenanceaction || '@@@' || d.provenanceaction || - // '@@@dnet:provenanceActions@@@dnet:provenanceActions' AS provenanceaction + final Relation r2 = new Relation(); + r2.setRelType("datasourceOrganization"); + r2.setSubRelType("provision"); + r2.setRelClass("provides"); + r2.setSource(orgId); + r2.setTarget(dsId); + r2.setCollectedFrom(null); // TODO + r2.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r2); + + // rs.getString("datasource"); + // rs.getString("organization"); + // rs.getDate("startdate"); // NULL + // rs.getDate("enddate"); // NULL + // rs.getBoolean("inferred"); // false + // rs.getBoolean("deletedbyinference"); // false + // rs.getDouble("trust"); // 0.9 + // rs.getString("inferenceprovenance"); // NULL + // rs.getString("semantics"); // 'providedBy@@@provided + // by@@@dnet:datasources_organizations_typologies@@@dnet:datasources_organizations_typologies' AS + // semantics, + // rs.getString("provenanceaction"); // d.provenanceaction || '@@@' || d.provenanceaction || + // '@@@dnet:provenanceActions@@@dnet:provenanceActions' AS provenanceaction - emitOaf(r); } catch (final Exception e) { - // TODO: handle exception + throw new RuntimeException(e); } } public void processProjectOrganization(final ResultSet rs) { try { - final Relation r = new Relation(); + final DataInfo info = prepareDataInfo(rs); + final String orgId = MigrationUtils.createOpenaireId("20", rs.getString("resporganization")); + final String projectId = MigrationUtils.createOpenaireId("40", rs.getString("project")); - r.setRelType(null); // TODO - r.setSubRelType(null); // TODO - r.setRelClass(null); // TODO - r.setSource(null); // TODO - r.setTarget(null); // TODO - r.setCollectedFrom(null); + final Relation r1 = new Relation(); + r1.setRelType("projectOrganization"); + r1.setSubRelType("participation"); + r1.setRelClass("isParticipant"); + r1.setSource(projectId); + r1.setTarget(orgId); + r1.setCollectedFrom(null);// TODO + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r1); + + final Relation r2 = new Relation(); + r2.setRelType("projectOrganization"); + r2.setSubRelType("participation"); + r2.setRelClass("hasParticipant"); + r2.setSource(orgId); + r2.setTarget(projectId); + r2.setCollectedFrom(null); // TODO + r2.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r2); + + // rs.getString("project"); + // rs.getString("resporganization"); + // rs.getInt("participantnumber"); + // rs.getDouble("contribution"); + // rs.getDate("startdate");// null + // rs.getDate("enddate");// null + // rs.getBoolean("inferred");// false + // rs.getBoolean("deletedbyinference"); // false + // rs.getDouble("trust"); + // rs.getString("inferenceprovenance"); // NULL + // rs.getString("semantics"); // po.semanticclass || '@@@' || po.semanticclass || + // '@@@dnet:project_organization_relations@@@dnet:project_organization_relations' AS semantics, + // rs.getString("provenanceaction"); // + // 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' + // AS provenanceaction - rs.getString("project"); - rs.getString("resporganization"); - rs.getInt("participantnumber"); - rs.getDouble("contribution"); - rs.getDate("startdate");// null - rs.getDate("enddate");// null - rs.getBoolean("inferred");// false - rs.getBoolean("deletedbyinference"); // false - rs.getDouble("trust"); - rs.getString("inferenceprovenance"); // NULL - rs.getString("semantics"); // po.semanticclass || '@@@' || po.semanticclass || - // '@@@dnet:project_organization_relations@@@dnet:project_organization_relations' AS semantics, - rs.getString("provenanceaction"); // 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' - // AS provenanceaction - emitOaf(r); } catch (final Exception e) { - // TODO: handle exception + throw new RuntimeException(e); } } + private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException { + final Boolean deletedbyinference = rs.getBoolean("deletedbyinference"); + final String inferenceprovenance = rs.getString("inferenceprovenance"); + final Boolean inferred = rs.getBoolean("inferred"); + final String trust = rs.getString("trust"); + return MigrationUtils.dataInfo(deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust); + } + + private Qualifier prepareQualifierSplitting(final String s) { + if (StringUtils.isBlank(s)) { return null; } + final String[] arr = s.split("@@@"); + return arr.length == 4 ? MigrationUtils.qualifier(arr[0], arr[1], arr[2], arr[3]) : null; + } + + private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) { + if (StringUtils.isBlank(s)) { return null; } + final String[] parts = s.split("###"); + if (parts.length == 2) { + final String value = parts[0]; + final String[] arr = parts[1].split("@@@"); + if (arr.length == 4) { return MigrationUtils.structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo); } + } + return null; + } + + private List prepareListOfStructProps(final Array array, final DataInfo dataInfo) throws SQLException { + final List res = new ArrayList<>(); + if (array != null) { + for (final String s : (String[]) array.getArray()) { + final StructuredProperty sp = prepareStructProp(s, dataInfo); + if (sp != null) { + res.add(sp); + } + } + } + + return res; + } + @Override public void close() throws IOException { super.close(); diff --git a/dhp-workflows/dhp-dedup/pom.xml b/dhp-workflows/dhp-dedup/pom.xml index 28ef6a453..81ac94f01 100644 --- a/dhp-workflows/dhp-dedup/pom.xml +++ b/dhp-workflows/dhp-dedup/pom.xml @@ -9,6 +9,37 @@ 4.0.0 dhp-dedup + + + + + net.alchim31.maven + scala-maven-plugin + 4.0.1 + + + scala-compile-first + initialize + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + ${scala.version} + + + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java index ab19ff2b5..0291be47e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java @@ -1,23 +1,31 @@ package eu.dnetlib.dhp.graph; -import com.google.common.collect.Maps; -import eu.dnetlib.dhp.schema.oaf.*; - import java.util.Map; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; + public class GraphMappingUtils { - public final static Map types = Maps.newHashMap(); + public final static Map types = Maps.newHashMap(); - static { - types.put("datasource", Datasource.class); - types.put("organization", Organization.class); - types.put("project", Project.class); - types.put("dataset", Dataset.class); - types.put("otherresearchproduct", OtherResearchProduct.class); - types.put("software", Software.class); - types.put("publication", Publication.class); - types.put("relation", Relation.class); - } + static { + types.put("datasource", Datasource.class); + types.put("organization", Organization.class); + types.put("project", Project.class); + types.put("dataset", Dataset.class); + types.put("otherresearchproduct", OtherResearchProduct.class); + types.put("software", Software.class); + types.put("publication", Publication.class); + types.put("relation", Relation.class); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index a6a4e9291..463bffae9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -1,7 +1,5 @@ package eu.dnetlib.dhp.graph; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; @@ -9,42 +7,47 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; import scala.Tuple2; public class SparkGraphImporterJob { - public static void main(String[] args) throws Exception { + public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); - parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(SparkGraphImporterJob.class.getSimpleName()) - .master(parser.get("master")) - .config("hive.metastore.uris", parser.get("hive_metastore_uris")) - .enableHiveSupport() - .getOrCreate(); + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkGraphImporterJob.class.getSimpleName()) + .master(parser.get("master")) + .config("hive.metastore.uris", parser.get("hive_metastore_uris")) + .enableHiveSupport() + .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String hiveDbName = parser.get("hive_db_name"); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String hiveDbName = parser.get("hive_db_name"); - spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - // Read the input file and convert it into RDD of serializable object - GraphMappingUtils.types.forEach((name, clazz) -> { - final JavaRDD> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) - .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); + // Read the input file and convert it into RDD of serializable object + GraphMappingUtils.types.forEach((name, clazz) -> { + final JavaRDD> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) + .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); - spark.createDataset(inputRDD - .filter(s -> s._1().equals(clazz.getName())) - .map(Tuple2::_2) - .map(s -> new ObjectMapper().readValue(s, clazz)) - .rdd(), Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .saveAsTable(hiveDbName + "." + name); - }); + spark.createDataset(inputRDD + .filter(s -> s._1().equals(clazz.getName())) + .map(Tuple2::_2) + .map(s -> new ObjectMapper().readValue(s, clazz)) + .rdd(), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .saveAsTable(hiveDbName + "." + name); + }); - } + } } diff --git a/pom.xml b/pom.xml index aedf5ebff..a27cf4fe7 100644 --- a/pom.xml +++ b/pom.xml @@ -1,426 +1,411 @@ - + - 4.0.0 - eu.dnetlib.dhp - dhp - 1.0.5-SNAPSHOT - pom + 4.0.0 + eu.dnetlib.dhp + dhp + 1.0.5-SNAPSHOT + pom - http://www.d-net.research-infrastructures.eu + http://www.d-net.research-infrastructures.eu - - - The Apache Software License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - repo - A business-friendly OSS license - - + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + - - dhp-build - dhp-schemas - dhp-common - dhp-workflows - + + dhp-build + dhp-schemas + dhp-common + dhp-workflows + - - Redmine - https://issue.openaire.research-infrastructures.eu/projects/openaire - + + Redmine + https://issue.openaire.research-infrastructures.eu/projects/openaire + - - jenkins - https://jenkins-dnet.d4science.org/ - + + jenkins + https://jenkins-dnet.d4science.org/ + - - scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git - scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git - https://code-repo.d4science.org/D-Net/dnet-hadoop/ - HEAD - + + scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git + scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git + https://code-repo.d4science.org/D-Net/dnet-hadoop/ + HEAD + - - + + - - - dnet45-releases - D-Net 45 releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases - default - - false - - - true - - - - cloudera - Cloudera Repository - https://repository.cloudera.com/artifactory/cloudera-repos - - true - - - false - - - + + + dnet45-releases + D-Net 45 releases + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + default + + false + + + true + + + + cloudera + Cloudera Repository + https://repository.cloudera.com/artifactory/cloudera-repos + + true + + + false + + + - - - junit - junit - 4.12 - test - + + + junit + junit + 4.12 + test + - - org.mockito - mockito-core - 2.7.22 - test - + + org.mockito + mockito-core + 2.7.22 + test + - + - - - - org.apache.hadoop - hadoop-hdfs - ${dhp.hadoop.version} - provided - - - org.apache.hadoop - hadoop-client - ${dhp.hadoop.version} - provided - - - org.apache.spark - spark-core_2.11 - ${dhp.spark.version} - provided - - - org.apache.spark - spark-sql_2.11 - ${dhp.spark.version} - provided - - - org.apache.spark - spark-graphx_2.11 - ${dhp.spark.version} - provided - + + + + org.apache.hadoop + hadoop-hdfs + ${dhp.hadoop.version} + provided + + + org.apache.hadoop + hadoop-client + ${dhp.hadoop.version} + provided + + + org.apache.spark + spark-core_2.11 + ${dhp.spark.version} + provided + + + org.apache.spark + spark-sql_2.11 + ${dhp.spark.version} + provided + + + org.apache.spark + spark-graphx_2.11 + ${dhp.spark.version} + provided + - - org.apache.commons - commons-lang3 - ${dhp.commons.lang.version} - + + org.apache.commons + commons-lang3 + ${dhp.commons.lang.version} + - - commons-codec - commons-codec - 1.9 - + + commons-codec + commons-codec + 1.9 + - - commons-io - commons-io - 2.4 - + + commons-io + commons-io + 2.4 + - - commons-cli - commons-cli - 1.2 - provided - + + commons-cli + commons-cli + 1.2 + provided + - - net.sf.saxon - Saxon-HE - 9.5.1-5 - + + net.sf.saxon + Saxon-HE + 9.5.1-5 + - - dom4j - dom4j - 1.6.1 - + + dom4j + dom4j + 1.6.1 + - - xml-apis - xml-apis - 1.4.01 - + + xml-apis + xml-apis + 1.4.01 + - - jaxen - jaxen - 1.1.6 - + + jaxen + jaxen + 1.1.6 + - - net.schmizz - sshj - 0.10.0 - test - + + net.schmizz + sshj + 0.10.0 + test + - - com.fasterxml.jackson.core - jackson-core - ${dhp.jackson.version} - provided - + + com.fasterxml.jackson.core + jackson-core + ${dhp.jackson.version} + provided + - - com.fasterxml.jackson.core - jackson-annotations - ${dhp.jackson.version} - provided - - - com.fasterxml.jackson.core - jackson-databind - ${dhp.jackson.version} - provided - + + com.fasterxml.jackson.core + jackson-annotations + ${dhp.jackson.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + ${dhp.jackson.version} + provided + - - eu.dnetlib - dnet-pace-core - 4.0.0-SNAPSHOT - + + eu.dnetlib + dnet-pace-core + 4.0.0-SNAPSHOT + - - javax.persistence - javax.persistence-api - 2.2 - provided - + + javax.persistence + javax.persistence-api + 2.2 + provided + - - com.rabbitmq - amqp-client - 5.6.0 - - - com.jayway.jsonpath - json-path - 2.4.0 - - - com.arakelian - java-jq - 0.10.1 - - - edu.cmu - secondstring - 1.0.0 - + + com.rabbitmq + amqp-client + 5.6.0 + + + com.jayway.jsonpath + json-path + 2.4.0 + + + com.arakelian + java-jq + 0.10.1 + + + edu.cmu + secondstring + 1.0.0 + - - org.apache.oozie - oozie-client - ${dhp.oozie.version} - provided - - - - slf4j-simple - org.slf4j - - - - - + + org.mongodb + mongo-java-driver + ${mongodb.driver.version} + - - target - target/classes - ${project.artifactId}-${project.version} - target/test-classes - - - - org.apache.maven.plugins - maven-compiler-plugin - ${maven.compiler.plugin.version} - - 1.8 - 1.8 - ${project.build.sourceEncoding} - - + + org.apache.oozie + oozie-client + ${dhp.oozie.version} + provided + + + + slf4j-simple + org.slf4j + + + + + - - org.apache.maven.plugins - maven-jar-plugin - 3.0.2 - + + target + target/classes + ${project.artifactId}-${project.version} + target/test-classes + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven.compiler.plugin.version} + + 1.8 + 1.8 + ${project.build.sourceEncoding} + + - - org.apache.maven.plugins - maven-source-plugin - 3.0.1 - - - attach-sources - verify - - jar-no-fork - - - - + + org.apache.maven.plugins + maven-jar-plugin + 3.0.2 + - - org.apache.maven.plugins - maven-surefire-plugin - 2.19.1 - - true - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.10.4 - - true - - - - org.apache.maven.plugins - maven-dependency-plugin - 3.0.0 - + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + verify + + jar-no-fork + + + + - - org.codehaus.mojo - build-helper-maven-plugin - 1.12 - - - - - - org.apache.maven.plugins - maven-release-plugin - 2.5.3 - - - org.jacoco - jacoco-maven-plugin - 0.7.9 - - - **/schemas/* - **/com/cloudera/**/* - **/org/apache/avro/io/**/* - - - - - default-prepare-agent - - prepare-agent - - - - default-report - prepare-package - - report - - - - - - net.alchim31.maven - scala-maven-plugin - 4.0.1 - - - scala-compile-first - initialize - - add-source - compile - - - - scala-test-compile - process-test-resources - - testCompile - - - - - ${scala.version} - - - + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + true + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.4 + + true + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.0 + - - - org.apache.maven.wagon - wagon-ssh - 2.10 - - - - - - dnet45-snapshots - DNet45 Snapshots - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots - default - - - dnet45-releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.10.4 - - true - - - - + + org.codehaus.mojo + build-helper-maven-plugin + 1.12 + + + + + + org.apache.maven.plugins + maven-release-plugin + 2.5.3 + + + org.jacoco + jacoco-maven-plugin + 0.7.9 + + + **/schemas/* + **/com/cloudera/**/* + **/org/apache/avro/io/**/* + + + + + default-prepare-agent + + prepare-agent + + + + default-report + prepare-package + + report + + + + - - UTF-8 - UTF-8 - 3.6.0 - 2.22.2 - cdh5.9.2 - 2.6.0-${dhp.cdh.version} - 4.1.0-${dhp.cdh.version} - 2.4.0.cloudera2 - 2.9.6 - 3.5 - 2.11.12 - + + + + + org.apache.maven.wagon + wagon-ssh + 2.10 + + + + + + dnet45-snapshots + DNet45 Snapshots + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots + default + + + dnet45-releases + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.4 + + true + + + + + + + UTF-8 + UTF-8 + 3.6.0 + 2.22.2 + cdh5.9.2 + 2.6.0-${dhp.cdh.version} + 4.1.0-${dhp.cdh.version} + 2.4.0.cloudera2 + 2.9.6 + 3.5 + 2.11.12 + 3.4.2 +