diff --git a/dhp-workflows/dhp-broker-events/README.md b/dhp-workflows/dhp-broker-events/README.md new file mode 100644 index 0000000000..bee6e99951 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/README.md @@ -0,0 +1,3 @@ +# dhp-broker-events +dhp-broker-events is a DNET module responsible +of the production of events for the OpenAIRE Broker Service. diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml new file mode 100644 index 0000000000..a57c4ba25f --- /dev/null +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -0,0 +1,66 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.1.7-SNAPSHOT + + 4.0.0 + + dhp-broker-events + + + + + commons-io + commons-io + + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + org.apache.spark + spark-hive_2.11 + test + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + + + + com.jayway.jsonpath + json-path + + + dom4j + dom4j + + + jaxen + jaxen + + + + eu.dnetlib + dnet-openaire-broker-common + [1.0.0,2.0.0) + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index f5ac56b787..55123cbf32 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -51,7 +51,7 @@ import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class MigrateDbEntitiesApplication extends AbstractMigrationApplication - implements Closeable { + implements Closeable { private static final Log log = LogFactory.getLog(MigrateDbEntitiesApplication.class); @@ -61,10 +61,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - MigrateDbEntitiesApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json"))); + IOUtils + .toString(MigrateDbEntitiesApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json"))); parser.parseArgument(args); @@ -77,7 +76,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser, - dbPassword)) { + dbPassword)) { if (processClaims) { log.info("Processing claims..."); smdbe.execute("queryClaims.sql", smdbe::processClaims); @@ -108,15 +107,15 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication } public MigrateDbEntitiesApplication( - final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword) - throws Exception { + final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword) + throws Exception { super(hdfsPath); this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); this.lastUpdateTimestamp = new Date().getTime(); } public void execute(final String sqlFile, final Function> producer) - throws Exception { + throws Exception { final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile)); final Consumer consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf)); @@ -135,10 +134,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication ds.setId(createOpenaireId(10, rs.getString("datasourceid"), true)); ds.setOriginalId(Arrays.asList(rs.getString("datasourceid"))); ds - .setCollectedfrom( - listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), - rs.getString("collectedfromname"))); + .setCollectedfrom(listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"))); ds.setPid(new ArrayList<>()); ds.setDateofcollection(asString(rs.getDate("dateofcollection"))); ds.setDateoftransformation(null); // Value not returned by the SQL query @@ -179,7 +175,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication ds.setCertificates(field(rs.getString("certificates"), info)); ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array ds - .setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal + .setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal ds.setDataInfo(info); ds.setLastupdatetimestamp(lastUpdateTimestamp); @@ -199,10 +195,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication p.setId(createOpenaireId(40, rs.getString("projectid"), true)); p.setOriginalId(Arrays.asList(rs.getString("projectid"))); p - .setCollectedfrom( - listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), - rs.getString("collectedfromname"))); + .setCollectedfrom(listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"))); p.setPid(new ArrayList<>()); p.setDateofcollection(asString(rs.getDate("dateofcollection"))); p.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); @@ -219,7 +212,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication p.setDuration(field(Integer.toString(rs.getInt("duration")), info)); p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info)); p - .setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info)); + .setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info)); p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info)); p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info)); @@ -256,10 +249,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication o.setId(createOpenaireId(20, rs.getString("organizationid"), true)); o.setOriginalId(Arrays.asList(rs.getString("organizationid"))); o - .setCollectedfrom( - listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), - rs.getString("collectedfromname"))); + .setCollectedfrom(listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"))); o.setPid(new ArrayList<>()); o.setDateofcollection(asString(rs.getDate("dateofcollection"))); o.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); @@ -274,14 +264,12 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info)); o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info)); o - .setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info)); + .setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info)); o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info)); o - .setEcinternationalorganizationeurinterests( - field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info)); + .setEcinternationalorganizationeurinterests(field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info)); o - .setEcinternationalorganization( - field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info)); + .setEcinternationalorganization(field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info)); o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info)); o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info)); o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info)); @@ -300,8 +288,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication final DataInfo info = prepareDataInfo(rs); final String orgId = createOpenaireId(20, rs.getString("organization"), true); final String dsId = createOpenaireId(10, rs.getString("datasource"), true); - final List collectedFrom = listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")); + final List collectedFrom = listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")); final Relation r1 = new Relation(); r1.setRelType("datasourceOrganization"); @@ -334,8 +321,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication final DataInfo info = prepareDataInfo(rs); final String orgId = createOpenaireId(20, rs.getString("resporganization"), true); final String projectId = createOpenaireId(40, rs.getString("project"), true); - final List collectedFrom = listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")); + final List collectedFrom = listKeyValues(createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")); final Relation r1 = new Relation(); r1.setRelType("projectOrganization"); @@ -365,12 +351,10 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication public List processClaims(final ResultSet rs) { - final DataInfo info = dataInfo( - false, null, false, false, - qualifier("user:claim", "user:claim", "dnet:provenanceActions", "dnet:provenanceActions"), "0.9"); + final DataInfo info = + dataInfo(false, null, false, false, qualifier("user:claim", "user:claim", "dnet:provenanceActions", "dnet:provenanceActions"), "0.9"); - final List collectedFrom = listKeyValues( - createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); + final List collectedFrom = listKeyValues(createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); try { @@ -456,15 +440,11 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication final String inferenceprovenance = rs.getString("inferenceprovenance"); final Boolean inferred = rs.getBoolean("inferred"); final String trust = rs.getString("trust"); - return dataInfo( - deletedbyinference, inferenceprovenance, inferred, false, - MigrationConstants.ENTITYREGISTRY_PROVENANCE_ACTION, trust); + return dataInfo(deletedbyinference, inferenceprovenance, inferred, false, MigrationConstants.ENTITYREGISTRY_PROVENANCE_ACTION, trust); } private Qualifier prepareQualifierSplitting(final String s) { - if (StringUtils.isBlank(s)) { - return null; - } + if (StringUtils.isBlank(s)) { return null; } final String[] arr = s.split("@@@"); return arr.length == 4 ? qualifier(arr[0], arr[1], arr[2], arr[3]) : null; } @@ -478,23 +458,19 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication } private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) { - if (StringUtils.isBlank(s)) { - return null; - } + if (StringUtils.isBlank(s)) { return null; } final String[] parts = s.split("###"); if (parts.length == 2) { final String value = parts[0]; final String[] arr = parts[1].split("@@@"); - if (arr.length == 4) { - return structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo); - } + if (arr.length == 4) { return structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo); } } return null; } private List prepareListOfStructProps( - final Array array, - final DataInfo dataInfo) throws SQLException { + final Array array, + final DataInfo dataInfo) throws SQLException { final List res = new ArrayList<>(); if (array != null) { for (final String s : (String[]) array.getArray()) { @@ -514,9 +490,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication if (arr.length == 3) { final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0].trim() : null; final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1].trim() : null; - ; final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null; - ; + if (issn != null || eissn != null || lissn != null) { return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); } diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 1645129b10..ea34339032 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -26,6 +26,7 @@ dhp-dedup-scholexplorer dhp-graph-provision-scholexplorer dhp-stats-update + dhp-broker-events