diff --git a/README.md b/README.md index 2189451..8bb6968 100644 --- a/README.md +++ b/README.md @@ -69,3 +69,20 @@ pyrest output is as follows: } ``` +## Run java worker example + +Run + +``` +./upload-workflow.sh test_myworker_workflow.json +``` + +Increase the number stored by the init task in the variable cycles in order cause an exception (values > 7). + +To start the my_worker implementing my_task enter directory jworker and run: + +``` +mvn clean compile exec:java +``` + +Access the UI and from the workbench run a test_myworker workflow. You should be able to see log information during execution of the task updating at every second. diff --git a/jworker/pom.xml b/jworker/pom.xml new file mode 100644 index 0000000..381de24 --- /dev/null +++ b/jworker/pom.xml @@ -0,0 +1,85 @@ + + + + 4.0.0 + + com.nubisware + jworker + 1.0.0 + + jworker + + http://www.example.com + + + UTF-8 + 1.7 + 1.7 + + + + + junit + junit + 4.11 + test + + + org.glassfish + javax.json + 1.0.2 + + + com.squareup.okhttp3 + okhttp + 4.9.0 + + + + + + + + + maven-clean-plugin + 3.1.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.8.0 + + + maven-surefire-plugin + 2.22.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + + diff --git a/jworker/src/main/java/com/nubisware/MyWorker.java b/jworker/src/main/java/com/nubisware/MyWorker.java new file mode 100644 index 0000000..ea38420 --- /dev/null +++ b/jworker/src/main/java/com/nubisware/MyWorker.java @@ -0,0 +1,156 @@ +package com.nubisware; + +import java.io.StringReader; +import java.util.ArrayList; +import java.util.UUID; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonObject; + +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +public class MyWorker { + + protected static final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); + + protected static final String taskdefUrl = "/api/metadata/taskdefs"; + protected static final String taskPollUrl = "/api/tasks/poll"; + protected static final String taskUpdateUrl = "/api/tasks"; + + final protected String url; + final protected String taskName; + final protected UUID workerId; + + protected OkHttpClient client; + + public MyWorker(String url, String taskName){ + this.url = url; + this.taskName = taskName; + this.workerId = UUID.randomUUID(); + + this.client = new OkHttpClient(); + + this.uploadTaskDefinition(); + } + + protected void uploadTaskDefinition(){ + JsonObject tdef = Json.createObjectBuilder() + .add("name", this.taskName) + .add("ownerEmail", "m.lettere@gmail.com") + .add("description", "Example task doing nothing actually") + .build(); + JsonArray tdefs = Json.createArrayBuilder() + .add(tdef) + .build(); + HttpUrl u = HttpUrl.parse(this.url + taskdefUrl); + RequestBody body = RequestBody.create(tdefs.toString(), JSON); + Request req = new Request.Builder() + .url(u) + .post(body) + .build(); + try { + Response resp = this.client.newCall(req).execute(); + System.out.println(resp.code()); + System.out.println(resp.body().string()); + } catch (Exception e) { + System.err.println(e); + } + } + + protected JsonObject pollTask(){ + JsonObject task = null; + HttpUrl u = HttpUrl.parse(this.url + taskPollUrl + "/" + this.taskName + "?workerid=" + this.workerId); + Request req = new Request.Builder().url(u).get().build(); + System.out.println("Polling ... "); + while(task == null){ + try { + //System.out.println("Polling ... "); + Response resp = this.client.newCall(req).execute(); + //System.out.println(resp.code()); + if(resp.code() == 200){ + task = Json.createReader(new StringReader(resp.body().string())).readObject(); + System.out.println("Task is " + task.getJsonObject("taskDefinition").getString("name")); + }else if(resp.code() != 204){ + System.err.println("Error while polling ... " + resp.body().string()); + } + Thread.sleep(1000); + } catch (Exception e) { + System.err.println(e); + } + } + return task; + } + + protected void updateTask(JsonObject task, String status, ArrayList logs){ + JsonArrayBuilder logsBuilder = Json.createArrayBuilder(); + for(String l: logs){ + logsBuilder.add(l); + } + JsonObject tstat = Json.createObjectBuilder() + .add("status", status) + .add("logs", logsBuilder.build()) + .add("description", "Example task doing nothing actually") + .add("taskId", task.getString("taskId")) + .add("workflowInstanceId", task.getString("workflowInstanceId")) + .build(); + + HttpUrl u = HttpUrl.parse(this.url + taskUpdateUrl); + RequestBody body = RequestBody.create(tstat.toString(), JSON); + Request req = new Request.Builder() + .url(u) + .post(body) + .build(); + try { + Response resp = this.client.newCall(req).execute(); + System.out.println(resp.code()); + System.out.println(resp.body().string()); + } catch (Exception e) { + System.err.println(e); + } + } + + protected void handleTask(JsonObject task){ + String name = task.getJsonObject("taskDefinition").getString("name"); + String id = task.getString("taskId"); + System.out.println("Handling task " + id + " " + name); + int cycles = task.getJsonObject("inputData").containsKey("cycles") ? + task.getJsonObject("inputData").getInt("cycles") : 1; + ArrayList logs = new ArrayList(); + for(int i=0; i < cycles; i++){ + try { + logs.add("Entering cycle " + i); + Thread.sleep(1000); + logs.add("After cycle " + i + " still ok"); + this.updateTask(task, "IN_PROGRESS", logs); + logs.clear(); + if(i % 7 == 6) throw new Exception("Force fail"); + } catch (Exception e) { + e.printStackTrace(); + logs.add("At cycle " + i + " I've been interrupted"); + this.updateTask(task, "FAILED", logs); + } + + } + this.updateTask(task, "COMPLETED", logs); + } + + protected void run(){ + while(true){ + JsonObject t = this.pollTask(); + this.handleTask(t); + } + } + + public static void main( String[] args ){ + System.out.println( "Starting MyWorker!" ); + MyWorker w = new MyWorker("http://localhost:8080", "mytask"); + w.run(); + } +} diff --git a/jworker/src/test/java/com/nubisware/AppTest.java b/jworker/src/test/java/com/nubisware/AppTest.java new file mode 100644 index 0000000..28a7b89 --- /dev/null +++ b/jworker/src/test/java/com/nubisware/AppTest.java @@ -0,0 +1,20 @@ +package com.nubisware; + +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Unit test for simple App. + */ +public class AppTest +{ + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() + { + assertTrue( true ); + } +} diff --git a/jworker/target/classes/com/nubisware/MyWorker.class b/jworker/target/classes/com/nubisware/MyWorker.class new file mode 100644 index 0000000..03fc4d5 Binary files /dev/null and b/jworker/target/classes/com/nubisware/MyWorker.class differ diff --git a/jworker/target/test-classes/com/nubisware/AppTest.class b/jworker/target/test-classes/com/nubisware/AppTest.class new file mode 100644 index 0000000..34c02c6 Binary files /dev/null and b/jworker/target/test-classes/com/nubisware/AppTest.class differ diff --git a/workflows/test_myworker_workflow.json b/workflows/test_myworker_workflow.json new file mode 100644 index 0000000..71a1687 --- /dev/null +++ b/workflows/test_myworker_workflow.json @@ -0,0 +1,41 @@ +{ + "ownerApp": null, + "createTime": 1666338703554, + "updateTime": 1666345110218, + "createdBy": null, + "updatedBy": null, + "name": "test_myworker", + "description": "Test myworker", + "version": 1, + "tasks": [ + { + "name": "init", + "taskReferenceName": "init", + "description": null, + "inputParameters": { + "cycles": 5 + }, + "type": "SET_VARIABLE" + }, + { + "name": "mytask", + "taskReferenceName": "test_mytask", + "description": "Task to test mytask", + "inputParameters": { + "cycles": "${workflow.variables.cycles}" + }, + "type": "SIMPLE" + } + ], + "inputParameters": [], + "outputParameters": {}, + "failureWorkflow": null, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "ownerEmail": "example@email.com", + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "variables": {}, + "inputTemplate": {} +}