Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise an error when using the read function on Executor to ensure heavy processing is only performed by the Worker #3126

Closed
anna-geller opened this issue Feb 24, 2024 · 0 comments · Fixed by #3214
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@anna-geller
Copy link
Member

anna-geller commented Feb 24, 2024

Suggested implementation

Tasks that update the execution, e.g. the Labels task, are processed by the executor.
We can restrict the read function to only run on the Worker; this will raise an error when trying to use the read function in the Labels task with a friendly message pointing the user to process the labels in a different task e.g. in a Return task and pass to the Labels via outputs.

Context

Examples of where the processing could be expensive if the file is large and should happen on the worker:

    inputFiles:
      data.json: "{{ read(outputs.json.uri).split('\n') }}"

# and:
    labels:
      sites: "{{ read(outputs.latest_maps.outputFiles['siteCodes.json']) }}"

Full example:

id: different_sitemap_consumer
namespace: neon_site_map
tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    # disabled: false
    groupId: kestra-nonprodmap
    maxRecords: 1
    properties:
      auto.offset.reset: earliest
      bootstrap.servers: "{{ secret('KAFKA_HOST') }}"
      sasl.jaas.config: >-
        org.apache.kafka.common.security.scram.ScramLoginModule required 
        username="{{ secret('KAFKA_USER') }}" 
        password="{{ secret('KAFKA_PASSWORD') }}";
      sasl.mechanism: SCRAM-SHA-512
      security.protocol: SASL_PLAINTEXT
    topic: 
      - cper.event.cnc.map
      - abby.event.cnc.map
      - clbj.event.cnc.map
      - como.event.cnc.map
      - barr.event.cnc.map
      - bart.event.cnc.map
      - blue.event.cnc.map
      - cari.event.cnc.map
    valueDeserializer: JSON

  - id: label_msg_count
    type: io.kestra.core.tasks.executions.Labels
    labels:
      count: "Consumed {{outputs.consume.messagesCount}} message(s)"

  - id: empty_fail
    type: io.kestra.core.tasks.executions.Fail
    errorMessage: "Consumed no messages"
    condition: "{{ outputs.consume.messagesCount == 0 }}"

  - id: json
    type: io.kestra.plugin.serdes.json.JsonWriter
    from: "{{ outputs.consume.uri }}"

  - id: latest_maps
    type: io.kestra.plugin.scripts.node.Script
    inputFiles:
      data.json: "{{ read(outputs.json.uri).split('\n') }}"
    # beforeCommands:
    #  - npm i deep-object-diff
    script: |
      const fs = require('fs')
      const maps =  JSON.parse(fs.readFileSync('data.json', 'utf-8'))
      const lastMaps = maps.reduce((acc, curr) => {
        const map = JSON.parse(curr)
        acc[map.key] = map
        return acc
      }, (accMaps = {}, accMaps));
      //console.log(JSON.stringify(lastMaps))
      const output = Object.keys(lastMaps).reduce((acc, curr) => {
        acc += JSON.stringify(lastMaps[curr]) + "\n"
        return acc
      }, "");
      try {
        fs.writeFileSync("{{ outputDir }}/maps", output)
        fs.writeFileSync("{{ outputDir }}/siteCodes.json", JSON.stringify(Object.keys(lastMaps)))
      } catch (e) {
        console.error(e);
      }

  - id: label_site_codes
    type: io.kestra.core.tasks.executions.Labels
    labels:
      sites: "{{ read(outputs.latest_maps.outputFiles['siteCodes.json']) }}"
  
  - id: each_map
    type: io.kestra.core.tasks.flows.ForEachItem
    items: "{{ outputs.latest_maps.outputFiles['maps'] }}"
    batch:
      rows: 1
    namespace: neon_site_map
    flowId: site_map_processor
    wait: true 
    transmitFailed: true 
    inputs:
      map: "{{ taskrun.items }}" 
@anna-geller anna-geller added the enhancement New feature or request label Feb 24, 2024
@anna-geller anna-geller added this to the v0.16.0 milestone Feb 24, 2024
@anna-geller anna-geller changed the title Validate that processing inputFiles and adding execution labels happens on the Worker Raise an error when using the read function in the Labels task to ensure heavy processing is only performed by the Worker Feb 25, 2024
@anna-geller anna-geller changed the title Raise an error when using the read function in the Labels task to ensure heavy processing is only performed by the Worker Raise an error when using the read function on Executor to ensure heavy processing is only performed by the Worker Feb 26, 2024
@loicmathieu loicmathieu self-assigned this Mar 5, 2024
@loicmathieu loicmathieu moved this to Todo in Docs Needed Mar 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Todo
Development

Successfully merging a pull request may close this issue.

2 participants