diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractDbApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractDbApplication.java index bdfc8fec7..7f4dd8ea3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractDbApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractDbApplication.java @@ -244,7 +244,7 @@ public abstract class AbstractDbApplication extends AbstractMigrationApplication listKeyValues( createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"))); - o.setPid(new ArrayList<>()); + o.setPid(prepareListOfStructProps(rs.getArray("pid"), info)); o.setDateofcollection(asString(rs.getDate("dateofcollection"))); o.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); o.setExtraInfo(new ArrayList<>()); // Values not present in the DB diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateCordaOrgsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateCordaOrgsApplication.java new file mode 100644 index 000000000..0e3269b1c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateCordaOrgsApplication.java @@ -0,0 +1,64 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.DbClient; +import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; + +public class MigrateCordaOrgsApplication extends AbstractDbApplication { + + private static final Logger log = LoggerFactory.getLogger(MigrateCordaOrgsApplication.class); + + public static final String SOURCE_TYPE = "source_type"; + public static final String TARGET_TYPE = "target_type"; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + MigrateCordaOrgsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_openorgs_parameters.json"))); + + parser.parseArgument(args); + + final String dbUrl = parser.get("postgresUrl"); + log.info("postgresUrl: {}", dbUrl); + + final String dbUser = parser.get("postgresUser"); + log.info("postgresUser: {}", dbUser); + + final String dbPassword = parser.get("postgresPassword"); + log.info("postgresPassword: xxx"); + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); + + try (final MigrateCordaOrgsApplication mapper = new MigrateCordaOrgsApplication(hdfsPath, dbUrl, dbUser, + dbPassword, isLookupUrl)) { + + log.info("Processing CORDA orgs..."); + mapper.execute("queryCordaOrganizations.sql", mapper::processOrganization); + + } + log.info("All done."); + + } + + public MigrateCordaOrgsApplication(final String hdfsPath, final String dbUrl, final String dbUser, + final String dbPassword, final String isLookupUrl) + throws Exception { + super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl); + } + + protected MigrateCordaOrgsApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS + super(dbClient, vocs); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_openorgs/oozie_app/workflow.xml index 9be62373e..7c42d9a1e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_openorgs/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_openorgs/oozie_app/workflow.xml @@ -1,9 +1,26 @@ - + + + + graphOutputPath + the target path to store raw graph + contentPath path location to store (or reuse) content from the aggregator + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + postgresOpenOrgsURL the postgres URL to access to the OpenOrgs database @@ -15,6 +32,19 @@ postgresOpenOrgsPassword the password of OpenOrgs database + + + dbSchema + beta + the database schema according to the D-Net infrastructure (beta or production) + + + mongoURL + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database isLookupUrl @@ -76,13 +106,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + + + + + + @@ -94,9 +129,76 @@ --postgresPassword${postgresOpenOrgsPassword} --isLookupUrl${isLookupUrl} + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateCordaOrgsApplication + --hdfsPath${contentPath}/db_cordaorgs + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --isLookupUrl${isLookupUrl} + + + + + + + + + + yarn + cluster + GenerateEntities + eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/db_openorgs,${contentPath}/db_cordaorgs + --targetPath${workingDir}/entities + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + GenerateGraph + eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --sourcePath${workingDir}/entities + --graphRawPath${graphOutputPath} + - + - + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryCordaOrganizations.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryCordaOrganizations.sql new file mode 100644 index 000000000..4e0af5dfe --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryCordaOrganizations.sql @@ -0,0 +1,32 @@ +SELECT + o.id AS organizationid, + o.legalshortname AS legalshortname, + o.legalname AS legalname, + ARRAY[]::text[] AS alternativenames, + o.websiteurl AS websiteurl, + o.logourl AS logourl, + o.ec_legalbody AS eclegalbody, + o.ec_legalperson AS eclegalperson, + o.ec_nonprofit AS ecnonprofit, + o.ec_researchorganization AS ecresearchorganization, + o.ec_highereducation AS echighereducation, + o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests, + o.ec_internationalorganization AS ecinternationalorganization, + o.ec_enterprise AS ecenterprise, + o.ec_smevalidated AS ecsmevalidated, + o.ec_nutscode AS ecnutscode, + o.dateofcollection AS dateofcollection, + o.lastupdate AS dateoftransformation, + false AS inferred, + false AS deletedbyinference, + o.trust AS trust, + '' AS inferenceprovenance, + d.id AS collectedfromid, + d.officialname AS collectedfromname, + o.country || '@@@dnet:countries' AS country, + 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, + ARRAY[]::text[] AS pid + +FROM dsm_organizations o + LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom) +WHERE o.id like 'corda__%'