From 3d496684313b7de52035a6b84755e001493e6be8 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 4 Nov 2022 17:36:09 +0100 Subject: [PATCH] [Subject Propagation] test and workflow definition --- .../null/publication/._SUCCESS.crc | Bin 8 -> 0 bytes ...cb-470f-b7ad-68b08e2882ec-c000.json.gz.crc | Bin 48 -> 0 bytes .../dhp-enrichment/null/publication/_SUCCESS | 0 ...2-40cb-470f-b7ad-68b08e2882ec-c000.json.gz | Bin 4714 -> 0 bytes .../eu/dnetlib/dhp/PropagationConstant.java | 33 ++++++++-- .../PrepareResultResultStep1.java | 16 +++-- .../SparkSubjectPropagationStep2.java | 48 +++++++++------ ...put_preparesubjecttoresult_parameters.json | 2 +- .../input_propagatesubject_parameters.json | 4 +- .../oozie_app/config-default.xml | 2 +- .../subjectpropagation/oozie_app/workflow.xml | 58 +++++++----------- .../SubjectPreparationJobTest.java | 2 +- .../SubjectPropagationJobTest.java | 2 +- 13 files changed, 99 insertions(+), 68 deletions(-) delete mode 100644 dhp-workflows/dhp-enrichment/null/publication/._SUCCESS.crc delete mode 100644 dhp-workflows/dhp-enrichment/null/publication/.part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz.crc delete mode 100644 dhp-workflows/dhp-enrichment/null/publication/_SUCCESS delete mode 100644 dhp-workflows/dhp-enrichment/null/publication/part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz diff --git a/dhp-workflows/dhp-enrichment/null/publication/._SUCCESS.crc b/dhp-workflows/dhp-enrichment/null/publication/._SUCCESS.crc deleted file mode 100644 index 3b7b044936a890cd8d651d349a752d819d71d22c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 8 PcmYc;N@ieSU}69O2$TUk diff --git a/dhp-workflows/dhp-enrichment/null/publication/.part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz.crc b/dhp-workflows/dhp-enrichment/null/publication/.part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz.crc deleted file mode 100644 index 24a358fe2ef5e6c14d29676fc594795138aeccce..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 48 zcmYc;N@ieSU}8whoSoM(DO0-d`{FFFT_+eH+6y_Crp+l;xVfQhbNf}z8FtSFy~G9i E0lCl;(*OVf diff --git a/dhp-workflows/dhp-enrichment/null/publication/_SUCCESS b/dhp-workflows/dhp-enrichment/null/publication/_SUCCESS deleted file mode 100644 index e69de29bb..000000000 diff --git a/dhp-workflows/dhp-enrichment/null/publication/part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz b/dhp-workflows/dhp-enrichment/null/publication/part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz deleted file mode 100644 index 768f68d92df8f70eedbd68f24fe821073db26d3b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4714 zcmV-w5|!;AiwFP!000000PS7fZrjL`zTc;42tJzxE6F7FFJLabc7BW#$42Ct0ZvX3 zw8*B^G0En1H|2P80_^2tZ}$cEVi!1hfqenro~PI+*{W_*qJAvf6MGV87zDORc6U{M zRbBnVPwkI2n(3*Kwc(qOwJ?jKgvP8kti5D2ngpy)PG}a@>b3lKewFe7D!i@NBu&n0 zL$_X2nOv?}`vi~2Olj)-Oex7HQ(YT=toadD3ai(SPoBPlw#<#Z?s3WzvS%v56Xj1? z%uLxJVR~p=dKZg;Egt7J*9&}trbQGnU$Y>TA~qJDu?r~dI)4)&iE_!_!N1|K+xOaI zZ_xJ#-7y_FUD{$@+H*RC77fF3*a2xX8f7TxY08z)S>mIJwsx==h$h15e|3Z>3zbT*VM6^et37c1z$pmGKd199uvFU0H_F`FH*UJAURWu^e1 zn$sNMWdY>A5HjEi#d4&CD&aKFk<9?jivrmhlXSxDRfX$1u7C*>A$EaOTT83>+ZKPG ztLl+}>?R48Oj*UNj((CrVsZqx0W z3QYNur?y;`7Ph+OXGYiu8uNq^J!NF)mml060s{)!(LXplM5WB2-RqsCdd$1b3hZNDi6%|NiYivovM00hctHWD#9F zBj;RC34;bM$W(|kqOy1)SR~*l@ce|&7-->(c#I>1kmza-1FTFX)3gB3n3`j;Nl!py z5jO@SQ^gn1X;dnb083*8-6Vj>`^28_)9Zvwl%FmPn zBcL~#E{PREOfG3hlOG7gv5^i!6o;ObXbX>?a<$cXBL`~vQ5G;%Wek1}ArNx0Xv18t zWFhn-=!yD99fg47O#?VtvZ?@Dj7ooMaTK5nHZ0tiH& zSZhD+3{lf51>~Vk2)^Ryg6Zw6;UR=NBPxj6ff zcQ(igHb+BfMzkDRfyqTh$OSgz!e{HtCfR3Nv!x2qGGq@%e#uN*>hd!LI@T;QTq-h2 zqDRc8HWZMjG^nA9JO`=)Lge!EghGF~{4Z>((1T1aKPS+v%};sayZj8@9Xbh(FaNF~ zdWtlSrkO1L1VY%zO%#iUbEHx@-coD!F$7n6U2(02yDH?M58Gb_elNqciL6;_hm%SeuE&$y%jv7V_2tP<}C6)NS>#C zG!auTWny^A98VLiy z<=s2{O=!84gr+o$I=W3 zH2^KowWI`485hKLuAr%Tl{J_!6!s&PMWD69=?(@Z_-ss}fUy9P#Ox10t>d#~p+Thc zcy5}NCRoKJOp9K#d9HtBZ_9i@z)L&V&p*RxvRL+;#M^QH=5+laXoX?B=QzXRpw;gT z#(~#!J=bqB*7LiJwa4Mu?F}4%OwEX)^0gYQ3clh-35^T@njHoJ84o$D_Ajm)5pm9% zBUIKj5PU_Q13uvzrf|n&jA>GWc7u96c&9JYE8MksS{jjEr`hg$gZ^$1YhN|JLEFi< zO_nyj?tIUy+G+U0od%fkn>|9P6$!uCJpd_Fp$u&I;q$Y=QyC)GCq4YQEoyf z8%NwK{SL-j7>|9n8K6xCKb5zRO|nIguPEcJDD&g1KfgNu`PBwxB*V4Hy9kz*(PlR- zmjLmZj6i&Cs&%S{yLsPX3#ghxPIgUSdW)SY!dZ~nb}TE&lF5pWPm@W+)pUc7MGKqt zTRi=Q!j#TZ>|L72xN{z-wV~VX^aq1>tKD}8PQ4~5FL#}fHCQH2cydtQVftwEIYJ8& z2|Z^FORFDWzXYMi?mxBm{^UJ_|IpZ~?Ql2W;mYNgqZdNdcANuvzHNt$mxd8kdAK5F zXmB4Ioa?n1)_0gcUI8#>#+xe&{y9HCZ<-zeYdk*7-E+Kmxa&2YU1<1S;P2Kx0d^J! z8XaPE{id<_8a3Z8g>Hl|IdouPuy<^PulKG+^e)A8a+ZkmWJA0l5c^!S?Y(Qg)F-0^ z%xuOcQ0%YM;;N03BMwQ*Gqvaxl+{}V*AK&I2yeg61UauH2O}*sdgc* z!Rr9JrvyaE5!KKR1g@{hKjRLsfZGts5!n}U3d=D`Mz$kYm|lbF)8(93;dKpOi!p&T z$ZrV9I$4aUb#icYx<4W(Qur_x;>kpjnTVQX?-+`;WIzrRyE5Zu+kRgZh~wtRGhC2H zvqc@H&FMY2^&kweUx47C4*j%pPh{M_Ko-_W~rbuvoxvF^zD{J1Qz!`Sqc9)Hg?ar!SiLqp^Sdc4->$27XW9 z?kF% z^31b2d1k*vo`H4OkB0nox*$Its~#-==zQ!|63V z=cc@E&uzAE%-i<7VasWH{eDHQmf}Uk zK5%_eC#ggNSqtWEEte5oFvb3B3H};L!&my5Y`3;B~biO6Au1?IfG?Bqc~%zwIyT}!okQh zjUrxzB`}Maf|3TxRPvakAj?>XX(!qRP@2GwsHKt#k|hC5VuD=zCK;s&Vt}mSoM#Mf zl`um%XC&o`;d)t?bO9TJkH$+o*2MgXmkhe>vV3x`Ks^0LKLf+6<9^Upcr> z`k){5damntJ3+_o_dA2o4f-&f?K01AcY;B`R?!C^;&A!R(Fdd0R}HuKxHW7&!MRmM zBK)S}<%(%Tl)ANc)nr@JusF~KAf6?+zw}c2^~&qkz5LCiWkc*0 zY=}{r4KXroh^?%^pQ#M$7wz{oo)2MQY#fluZ@JRv(r?~)xP=U|m2i^uQ; zhU17)WX5sJAA<=Qwo9ZCcnoCD@)?6W^G1s^EzLockCT6KpX?|NwH1@7e3Lx=CF7v3 zId0Xe2h!KYnud_I;OAA$AvN&Z?<1c(h9`$H(8WYU;woJ3sohOb{Sxlqo!~kC8CSe4Q1HyhE#6 zYL4SPxAZ6EW*Ys=4de(J)048t6!QPLUXh0;8!g5ekF-5DmL~yAk)nq*PI#J3K!M{7 znT7b`PZ?5Z0LsrSk!vgr*AY*2gSnN$H3d%IX+on5yeP3XbM4F{#m)5`O>1RpPJYAh zWEjdJXc_TeS^20{Um@oZ_lQ(|`zg5aJf#{5PxIicigNZ%MamTur;2j+?NH7(pIc}8(l>T)U1#97ystwy zn@8{U=hn?9{5L$g&LI7}FwSl;8egMkMLDY|XZJ-pt4^-FAzszVbrl8eYvZ*T6Mp;2 zb;X!^^~rSy^{t22?Uh#Pd-GL#MQ8i%E4WR^(S2Vpxc^xG{013J^X;ek9%OYU-NW~4 zc^|iO=Yx0W=H2Ydu@By{tFId>mp*ux(o75U*Q)HhA_8SD5)pG6RnC3z&Mlu0IlA(E zNPGc20vpZwar1+_^YxHDDRNYu68e2b!&f>jw2FbR9Rt-V!GB!eFZ%!f^Q-#*^0&X9 zGZ8Z_FH8o2N)<)?0a3)u8=NGYJmF8(kyc9fjBnUnCZV{=!G7X1nKI%SbM2v^zt$-*jDXosoQpw82)V!@8ZW*BSTw!=cmewtKz6_wX+j`kl~i zcUo?@OKEQ$j(Z(eyD4q(cgj4Qxr4VLqTOXis`}6u_MtB!r`@e1w-VOcE0~qvjoO!= z-#v8=a*=Azv~GrcmnFL;rPXfRb6Z_Db{gG5H)yo`UAHmr^m~ogxZQDSyGsXddmF(m zhSh%xhFQRfM@aAFoPFHv(%ng5wu_ctlPCsE~dj?*UXD%E804%m0RTDhu!^WG>;^L{b`Z1#=lnE+i}}&+s#ps*@Lxad~pa;UnZdJi-`0 sd4pZ2vkSr+q~+uvn-$;Yd*<6zyG0LXw`fD}H#) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } + public static Dataset readOafKryoPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.kryo(clazz)); + } + + public static Class[] getModelClasses() { + List> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses()); + modelClasses + .addAll( + Lists + .newArrayList( + Result.class, + Qualifier.class, + DataInfo.class, + Publication.class, + eu.dnetlib.dhp.schema.oaf.Dataset.class, + Software.class, + OtherResearchProduct.class, + Subject.class, + AccessRight.class)); + return modelClasses.toArray(new Class[] {}); + } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/PrepareResultResultStep1.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/PrepareResultResultStep1.java index 0a82c3981..f35ad52e1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/PrepareResultResultStep1.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/PrepareResultResultStep1.java @@ -70,7 +70,7 @@ public class PrepareResultResultStep1 implements Serializable { final List allowedSemRel = Arrays .asList( - parser.get("allowedSemRel").split(";")) + parser.get("allowedsemrels").split(";")) .stream() .map(s -> s.toLowerCase()) .collect(Collectors.toList()); @@ -98,7 +98,7 @@ public class PrepareResultResultStep1 implements Serializable { Dataset result = readPath(spark, inputPath + "/" + resultType, resultClazz) .filter( (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - !r.getDataInfo().getInvisible() && + !r.getDataInfo().getInvisible() && Optional.ofNullable(r.getSubject()).isPresent() && r .getSubject() .stream() @@ -116,22 +116,28 @@ public class PrepareResultResultStep1 implements Serializable { (MapGroupsFunction, ResultSubjectList>) (k, it) -> getResultSubjectList(subjectClassList, k, it), Encoders.bean(ResultSubjectList.class)) + .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + "/" + resultType); } - @NotNull private static ResultSubjectList getResultSubjectList(List subjectClassList, String k, Iterator> it) { + Tuple2 first = it.next(); + if (!Optional.ofNullable(first._1()).isPresent()) { + return null; + } ResultSubjectList rsl = new ResultSubjectList(); rsl.setResId(k); - Tuple2 first = it.next(); List sbjInfo = new ArrayList<>(); Set subjectSet = new HashSet<>(); extracted(subjectClassList, first._1().getSubject(), sbjInfo, subjectSet); - it.forEachRemaining(t2 -> extracted(subjectClassList, t2._1().getSubject(), sbjInfo, subjectSet)); + it.forEachRemaining(t2 -> { + if (Optional.ofNullable(t2._1()).isPresent()) + extracted(subjectClassList, t2._1().getSubject(), sbjInfo, subjectSet); + }); rsl.setSubjectList(sbjInfo); return rsl; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/SparkSubjectPropagationStep2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/SparkSubjectPropagationStep2.java index d546a8d8f..2a3bcff51 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/SparkSubjectPropagationStep2.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/SparkSubjectPropagationStep2.java @@ -50,6 +50,7 @@ public class SparkSubjectPropagationStep2 implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); @@ -58,14 +59,15 @@ public class SparkSubjectPropagationStep2 implements Serializable { final String resultType = parser.get("resultType"); log.info("resultType: {}", resultType); - final String inputPath = parser.get("inputPath"); + final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); SparkConf conf = new SparkConf(); - + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(getModelClasses()); runWithSparkSession( conf, isSparkSessionManaged, @@ -83,7 +85,11 @@ public class SparkSubjectPropagationStep2 implements Serializable { Class resultClazz, String resultType) { - Dataset results = readPath(spark, inputPath + "/" + resultType, resultClazz); + Dataset> results = readOafKryoPath(spark, inputPath + "/" + resultType, resultClazz) + .map( + (MapFunction>) r -> new Tuple2(r.getId(), r), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(resultClazz))); + Dataset preparedResult = readPath( spark, preparedPath + "/publication", ResultSubjectList.class) .union(readPath(spark, preparedPath + "/dataset", ResultSubjectList.class)) @@ -93,20 +99,26 @@ public class SparkSubjectPropagationStep2 implements Serializable { results .joinWith( preparedResult, - results.col("id").equalTo(preparedResult.col("resId")), + results.col("_1").equalTo(preparedResult.col("resId")), "left") - .map((MapFunction, R>) t2 -> { - R res = t2._1(); + .map((MapFunction, ResultSubjectList>, String>) t2 -> { + R res = t2._1()._2(); + // estraggo le tipologie di subject dal result + Map> resultMap = new HashMap<>(); if (Optional.ofNullable(t2._2()).isPresent()) { - // estraggo le tipologie di subject dal result - Map> resultMap = new HashMap<>(); - res.getSubject().stream().forEach(s -> { - String cid = s.getQualifier().getClassid(); - if (!resultMap.containsKey(cid)) { - resultMap.put(cid, new ArrayList<>()); - } - resultMap.get(cid).add(s.getValue()); - }); + if(Optional.ofNullable(res.getSubject()).isPresent()){ + res.getSubject().stream().forEach(s -> { + String cid = s.getQualifier().getClassid(); + if(!cid.equals(ModelConstants.DNET_SUBJECT_KEYWORD)){ + if (!resultMap.containsKey(cid)) { + resultMap.put(cid, new ArrayList<>()); + } + resultMap.get(cid).add(s.getValue()); + } + }); + }else{ + res.setSubject(new ArrayList<>()); + } // Remove from the list all the subjects with the same class already present in the result List distinctClassId = t2 @@ -142,12 +154,12 @@ public class SparkSubjectPropagationStep2 implements Serializable { } } - return res; - }, Encoders.bean(resultClazz)) + return OBJECT_MAPPER.writeValueAsString(res); + }, Encoders.STRING()) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath + "/" + resultType); + .text(workingPath + "/" + resultType); readPath(spark, workingPath + "/" + resultType, resultClazz) .write() diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json index a8ec1d5b3..1e3ac1af4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json @@ -3,7 +3,7 @@ { "paramName":"asr", - "paramLongName":"allowedSemRel", + "paramLongName":"allowedsemrels", "paramDescription": "the set of semantic relations between the results to be exploited to perform the propagation", "paramRequired": true }, diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json index 0cb51c598..76942cbe6 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json @@ -13,8 +13,8 @@ "paramRequired": true }, { - "paramName":"ip", - "paramLongName":"inputPath", + "paramName":"sp", + "paramLongName":"sourcePath", "paramDescription": "the path of the input graph", "paramRequired": true }, diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/config-default.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/config-default.xml index caf3c6050..0ce8cef58 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/config-default.xml @@ -48,7 +48,7 @@ sparkExecutorMemory - 6G + 10G sparkExecutorCores diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/workflow.xml index b7f48a4e0..b16a1b00f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/workflow.xml @@ -1,12 +1,4 @@ - - - - - - - - sourcePath @@ -16,14 +8,6 @@ subjectlist the list of subject classid to propagate (split by ;) - - resultType - the result tapy - - - resultTableName - the class of the result - allowedsemrels the allowed semantics @@ -64,14 +48,14 @@ - + yarn cluster - PrepareProjectResultsAssociation + PrepareSubjectResultsAssociation eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1 dhp-enrichment-${projectVersion}.jar @@ -98,7 +82,7 @@ yarn cluster - PrepareProjectResultsAssociation + PrepareSubjectResultsAssociation eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1 dhp-enrichment-${projectVersion}.jar @@ -125,7 +109,7 @@ yarn cluster - PrepareProjectResultsAssociation + PrepareSubjectResultsAssociation eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1 dhp-enrichment-${projectVersion}.jar @@ -152,7 +136,7 @@ yarn cluster - PrepareProjectResultsAssociation + PrepareSubjectResultsAssociation eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1 dhp-enrichment-${projectVersion}.jar @@ -188,12 +172,12 @@ yarn cluster - ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + SubjectToResultPropagation + eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2 dhp-enrichment-${projectVersion}.jar --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} + --executor-memory=8G --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} @@ -201,8 +185,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 - --inputPath${sourcePath} + --sourcePath${sourcePath} --outputPath${outputPath} --workingPath${workingDir}/working --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication @@ -217,8 +202,8 @@ yarn cluster - ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + SubjectToResultPropagation + eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2 dhp-enrichment-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -230,8 +215,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 - --inputPath${sourcePath} + --sourcePath${sourcePath} --outputPath${outputPath} --workingPath${workingDir}/working --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct @@ -246,8 +232,8 @@ yarn cluster - ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + SubjectToResultPropagation + eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2 dhp-enrichment-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -259,8 +245,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 - --inputPath${sourcePath} + --sourcePath${sourcePath} --outputPath${outputPath} --workingPath${workingDir}/working --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset @@ -275,8 +262,8 @@ yarn cluster - ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + SubjectToResultPropagation + eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2 dhp-enrichment-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -288,8 +275,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 - --inputPath${sourcePath} + --sourcePath${sourcePath} --outputPath${outputPath} --workingPath${workingDir}/working --resultTableNameeu.dnetlib.dhp.schema.oaf.Software @@ -300,7 +288,7 @@ - + diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPreparationJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPreparationJobTest.java index f782feab9..0b3b45d7e 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPreparationJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPreparationJobTest.java @@ -81,7 +81,7 @@ public class SubjectPreparationJobTest { PrepareResultResultStep1 .main( new String[] { - "-allowedSemRel", + "-allowedsemrels", "IsSupplementedBy;IsSupplementTo;IsPreviousVersionOf;IsNewVersionOf;IsIdenticalTo;Obsoletes;IsObsoletedBy;IsVersionOf", "-subjectlist", "fos;sdg", "-resultType", "publication", diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPropagationJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPropagationJobTest.java index b324b49d8..48c425bbc 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPropagationJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPropagationJobTest.java @@ -76,7 +76,7 @@ public class SubjectPropagationJobTest { .getResource("/eu/dnetlib/dhp/subjectpropagation/preparedInfo") .getPath(), "-resultType", "publication", - "-inputPath", getClass() + "-sourcePath", getClass() .getResource("/eu/dnetlib/dhp/subjectpropagation") .getPath(), "-isSparkSessionManaged", Boolean.FALSE.toString(),