openorgs pids

This commit is contained in:
Michele Artini 2020-07-28 16:16:20 +02:00
parent c364b329c4
commit 2c2311b062
4 changed files with 204 additions and 6 deletions

View File

@ -244,7 +244,7 @@ public abstract class AbstractDbApplication extends AbstractMigrationApplication
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
o.setPid(new ArrayList<>());
o.setPid(prepareListOfStructProps(rs.getArray("pid"), info));
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
o.setExtraInfo(new ArrayList<>()); // Values not present in the DB

View File

@ -0,0 +1,64 @@
package eu.dnetlib.dhp.oa.graph.raw;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
public class MigrateCordaOrgsApplication extends AbstractDbApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateCordaOrgsApplication.class);
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateCordaOrgsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_openorgs_parameters.json")));
parser.parseArgument(args);
final String dbUrl = parser.get("postgresUrl");
log.info("postgresUrl: {}", dbUrl);
final String dbUser = parser.get("postgresUser");
log.info("postgresUser: {}", dbUser);
final String dbPassword = parser.get("postgresPassword");
log.info("postgresPassword: xxx");
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
try (final MigrateCordaOrgsApplication mapper = new MigrateCordaOrgsApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) {
log.info("Processing CORDA orgs...");
mapper.execute("queryCordaOrganizations.sql", mapper::processOrganization);
}
log.info("All done.");
}
public MigrateCordaOrgsApplication(final String hdfsPath, final String dbUrl, final String dbUser,
final String dbPassword, final String isLookupUrl)
throws Exception {
super(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl);
}
protected MigrateCordaOrgsApplication(final DbClient dbClient, final VocabularyGroup vocs) { // ONLY FOT TESTS
super(dbClient, vocs);
}
}

View File

@ -1,9 +1,26 @@
<workflow-app name="import OpenOrgs" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="create RAW Graph (openorgs and cordaorgs)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>postgresOpenOrgsURL</name>
<description>the postgres URL to access to the OpenOrgs database</description>
@ -15,6 +32,19 @@
<property>
<name>postgresOpenOrgsPassword</name>
<description>the password of OpenOrgs database</description>
</property>
<property>
<name>dbSchema</name>
<value>beta</value>
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
</property>
<property>
<name>mongoURL</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>isLookupUrl</name>
@ -76,13 +106,18 @@
</configuration>
</global>
<start to="ImportDB_openorgs"/>
<start to="start_import"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportDB_openorgs">
<fork name="start_import">
<path start="ImportDB_corda_orgs"/>
<path start="ImportDB_openorgs"/>
</fork>
<action name="ImportDB_openorgs">
<java>
<prepare>
<delete path="${contentPath}/db_openorgs"/>
@ -94,9 +129,76 @@
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<action name="ImportDB_corda_orgs">
<java>
<prepare>
<delete path="${contentPath}/db_records"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateCordaOrgsApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_cordaorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<join name="wait_import" to="GenerateEntities"/>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openorgs,${contentPath}/db_cordaorgs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/entities</arg>
<arg>--graphRawPath</arg><arg>${graphOutputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
</workflow-app>

View File

@ -0,0 +1,32 @@
SELECT
o.id AS organizationid,
o.legalshortname AS legalshortname,
o.legalname AS legalname,
ARRAY[]::text[] AS alternativenames,
o.websiteurl AS websiteurl,
o.logourl AS logourl,
o.ec_legalbody AS eclegalbody,
o.ec_legalperson AS eclegalperson,
o.ec_nonprofit AS ecnonprofit,
o.ec_researchorganization AS ecresearchorganization,
o.ec_highereducation AS echighereducation,
o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests,
o.ec_internationalorganization AS ecinternationalorganization,
o.ec_enterprise AS ecenterprise,
o.ec_smevalidated AS ecsmevalidated,
o.ec_nutscode AS ecnutscode,
o.dateofcollection AS dateofcollection,
o.lastupdate AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
o.trust AS trust,
'' AS inferenceprovenance,
d.id AS collectedfromid,
d.officialname AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid
FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
WHERE o.id like 'corda__%'