Building Streaming and Cross-Environment Data Processing Pipelines with RDF-Connect

Ieben Smessaert, Arthur Vercruysse, Julián Rojas Meléndez, Pieter Colpaert,

ISWC 2025, November 2, 2025

Building Streaming and Cross-Environment Data Processing Pipelines with RDF-Connect

Ghent University – imec – IDLab, Belgium

Tutorial website: open.gent/r/iswc-rdfc

ISWC 2025, November 2, 2025

Download dependencies while we're waiting


# Clone the tutorial repository
git clone git@github.com:rdf-connect/nara-weather-forecast-kg-pipeline.git
cd nara-weather-forecast-kg-pipeline/pipeline/resources

# Start the development container
docker compose up -d

# Open a shell inside the container
docker compose exec devbox bash

# Prepare the Python environment for the processor
cd processor/
hatch env create
hatch shell
    
⏳ These commands preload dependencies and images in the background so everything is ready when we start coding.
✅ You’ll end up inside a fully configured devbox environment.
QR code for tutorial guideline repository
open.gent/r/iswc-rdfc-repo

Agenda

Tutorial overview

Theory and practice intertwined

You will learn the motivation behind RDF-Connect, its conceptual model, architecture and roadmap. All while setting up, extending and running an example RDF-Connect pipeline.

Example pipeline: An RDF KG lifecycle

A pipeline of a knowledge graph lifecycle process, where weather data (from the Japanese meteorological API service) will be collected, transformed into RDF, enriched, validated against a SHACL shape, and published on a RDF graph store.

Tutorial resources

Tutorial Website

The tutorial website has a complete description and motivation for the tutorial. There you may also find all the resources you need to follow along, including these slides.

Developer resources

We have prepared a GitHub repository containing a step-by-step guide (split over dedicated branches) that will allow you to start and check the result of any task of the tutorial at any time.

Assembling our very first pipeline

Pipeline 1 Diagram

We want to [fetch data from the JMA meteorological forecast API] and [log its contents] to the console.

Follow on our step-by-step
tutorial code repository

QR code for tutorial guideline repository
open.gent/r/iswc-rdfc-repo

Step 0: Choose your environment

Run locally or in a containerized environment:


# Build and run the Docker image
cd pipeline/resources
docker compose up -d

# Access the devbox container
docker compose exec devbox bash
cd pipeline/

# You can now run commands like `npm install` or `npx rdfc pipeline.ttl`
# inside the container
  
This way you avoid having to install running environments for Python, Node.js, Java, etc.

Step 1: Setup

Install the orchestrator, runner, and processors:


npm install @rdfc/orchestrator-js
npm install @rdfc/js-runner
npm install @rdfc/http-utils-processor-ts
npm install @rdfc/log-processor-ts
  
🛠️ The log processor is also available in Python.
You can swap it to see cross-language interoperability in action!

Step 2: Initialize pipeline.ttl

Add the prefixes rdfc, owl, ex


@prefix rdfc: <https://w3id.org/rdf-connect#>.
@prefix owl: <http://www.w3.org/2002/07/owl#>.
@prefix ex: <http://example.org/>.
    

Declare the RDF-Connect pipeline


<> a rdfc:Pipeline.
    

Step 3: Add the rdfc:NodeRunner

Import definition via owl:imports


<> owl:imports <./node_modules/@rdfc/js-runner/index.ttl>.
    

Attach it to the pipeline declaration


<> a rdfc:Pipeline;
   rdfc:consistsOf [
       rdfc:instantiates rdfc:NodeRunner;
   ].
    

Step 4: Add the rdfc:HttpFetch processor

Import definition via owl:imports


<> owl:imports <./node_modules/@rdfc/http-utils-processor-ts/processors.ttl>.
    

Define the channel


<json> a rdfc:Reader, rdfc:Writer.
    

Define the processor instantiation


<fetcher> a rdfc:HttpFetch;
    rdfc:url "https://www.jma.go.jp/bosai/forecast/data/overview_forecast/290000.json";
    rdfc:writer <json>.
    

Attach the processor to the runner


<> a rdfc:Pipeline;
   rdfc:consistsOf [
       rdfc:instantiates rdfc:NodeRunner;
       rdfc:processor <fetcher> ].
    

Step 5: Add the rdfc:LogProcessorJs

Import definition via owl:imports


<> owl:imports <./node_modules/@rdfc/log-processor-ts/processor.ttl>.
    

Define the processor instantiation


<logger> a rdfc:LogProcessorJs;
    rdfc:reader <json>;
    rdfc:level "info";
    rdfc:label "output".
    

Attach the processor to the runner


[ rdfc:instantiates rdfc:NodeRunner;
  rdfc:processor <fetcher>, <logger> ].
    

Step 6: Run the pipeline


npx rdfc pipeline.ttl
# or with debug logging:
LOG_LEVEL=debug npx rdfc pipeline.ttl
  

✅ Solution available in task-1 branch.

Now try task 0 & 1 yourself!

  1. Setup
  2. Initialize pipeline.ttl
  3. Add the rdfc:NodeRunner
  4. Add the rdfc:HttpFetch processor
  5. Add the rdfc:LogProcessorJs
  6. Run the pipeline
QR code for tutorial guideline repository
open.gent/r/iswc-rdfc-repo

Data processing pipelines are crucial in modern data-centric systems

They enable the transformation, integration, and analysis of data from and to various sources and targets.

Pipelines are usually composed of multiple complex tasks

However, building, managing and reusing these pipelines can be complex and challenging.

Pipelines are ubiqutous in real world systems

Our main motivation use case

We faced again and again the challenges of handling the lifecycle of Knowledge Graphs in multiple domains.

Stream processing computational paradigm for continuous and dynamic data systems

Traditional batch processing systems suffer from latency problems due to the need to collect input data into batches before it can be processed. — Isah, H., et al., A Survey of Distributed Data Stream Processing Frameworks, IEEE Access, 2019

Current real-world data systems often require real-time or near-real-time processing of dynamic data. Stream processing allows for the continuous ingestion and processing of data as it arrives, enabling timely insights and actions.

Cross-environment execution: choosing the best of all worlds

The ability to execute applications written in different programming languages in an integrated manner offers several advantages:

Polyglot Architecture

Declarative, reusable and provenance-aware data processing

Scientists want to use provenance data to answer questions such as: Which data items were involved in the generation of a given partial result? or Did this actor employ outputs from one of these two other actors? — Cuevas-Vicentin, V., et al., Scientific Workflows and Provenance: Introduction and Research Opportunities, Datenbank Spektrum, 2012
Provenance is instrumental to activities such as traceability, reproducibility, accountability, and quality assessment. — Herschel, M., et al., A Survey on Provenance: What for? What form? What from?, VLDB, 2017
Prospective provenance—the execution plan—is essentially the workflow itself: it includes a machine-readable specification with the processing steps to be performed and the data and software dependencies to carry out each computation. — Simone, L., et al., Recording provenance of workflow runs with RO-Crate, PLoS ONE, 2024

RDF-Connect requirements overview

RDF-Connect requirements overview

Aren't there like a million pipeline frameworks already?

Related work

Aren't there like a million pipeline frameworks already?

Aren't there like a million pipeline frameworks already?

Existing pipeline frameworks
https://github.com/common-workflow-language/common-workflow-language/wiki/Existing-Workflow-systems https://github.com/pditommaso/awesome-pipeline

Enter RDF-Connect

RDF-Connect — Key features

A worthy mention:
Common Workflow Language

Common Workflow Language Logo

Common Workflow Language (CWL) is an open standard for describing how to run command line tools and connect them to create workflows.

Common Workflow Language Description

How is RDF-Connect different from CWL?

Feature RDF-Connect CWL
Streaming support Event-based design that supports both batch and streaming paradigms Primarily batch-oriented, although implementation-dependent streaming can be supported (e.g,. using named pipes)
Polyglot Supports any language through an add-in libraries approach Can accomodate polylingual workflows via POSIX CLI interfaces
Provenance Built-in semantic prospective and retrospective provenance tracking based on PROV-O Retrospective provenance extension available (CWLProv) based on PROV-O
Schema expressivity Full SHACL-based expressivity Set of defined types and limited constraint definitions

Another worthy mention:
Workflow Run RO-Crate profiles

RO-Crates workflows

Workflow Run RO-Crate profiles provide a semantic way to describe workflows including:

Coffee break! ☕

Agenda

Running example: the goal

  1. Retrieve JSON data from JMA weather forecast API
  2. Data is transformed into RDF using RML
  3. Translate language-typed literals to another language
  4. RDF data is validated against a shape
  5. RDF data is published through a triple store
Running example - the goal

High-level architecture overview

RDF-Connect generic architecture

Pipeline File Structure

A pipeline is described in RDF configuration files:

  • 🔗 Channels
    Define how data flows between processors.
  • 📦 Runners
    Specify which runtime environments are needed.
  • ⚙️ Processors
    Tasks that run inside a runner.
Pipeline file structure

RDF-Connect data model overview

RDF-Connect data model

RDF-Connect logical inference over a processor

General definitions in the RDFC ontology:


# Processor class definition
rdfc:Processor a rdfs:Class;
    rdfs:subClassOf prov:Activity.

rdfc:implementationOf a rdf:Property;
    rdfs:subPropertyOf rdfs:subClassOf.

# Property for JavaScript processors
rdfc:jsImplementationOf a rdf:Property;
    rdfs:subPropertyOf rdfc:implementationOf.
    

# JavaScript Runner definition
<myJSRunner> a rdfc:Runner;
    rdfc:handlesSubjectsOf rdfc:jsImplementationOf;
    rdfc:command "npx js-runner".
    

RDF-Connect logical inference over a processor

Concrete processor definition:


# Language-specific processor definition
ex:LogProcessorJS rdfc:jsImplementationOf rdfc:Processor.
  rdfs:label "Simple Log Processor for JavaScript";
  rdfs:comment "Logs incoming messages";
  rdfc:entrypoint <./>;
  rdfc:file <./lib/util_processors.js>;
  rdfc:class "LogProcessor".
    

# Processor instantiation in pipeline
_:p1 a ex:LogProcessorJS;
  ...
    

Following the simple entailment relations, we obtain that:

check it online


rdfc:jsImplementationOf rdfs:subPropertyOf rdfs:subClassOf.

ex:LogProcessorJS rdfc:implementationOf rdfc:Processor;
  rdfs:subClassOf rdfc:Processor;
  rdfs:subClassOf prov:Activity.

_:p1 a rdfc:Processor, prov:Activity.
    

Pipeline design of running example

Pipeline design architecture

Deep Dive: SHACL Shapes

Each runner and processor comes with a SHACL shape.
These shapes serve as the glue of RDF-Connect:

Deep Dive: SHACL Shapes Example

Deep Dive Orchestrator: Overview

Deep Dive Orchestrator: Responsibilities

  1. 📜 Initialize Pipeline
    1. ▶️ Start runners
      Launch each runner using its configured command.
    2. ⚙️ Initialize processors
      Instruct runners to start the processors they manage.
  2. 📡 Route messages
    Deliver incoming messages to the correct runner / processor.
Pipeline 1 Diagram

Deep Dive Orchestrator: Message Types

Design decision: channels connect processors 1 to 1.

Message Types: Single Message

  1. Send the message
  2. Process the message
  3. Acknowledge message processed
Pipeline 1 Diagram

Message Types: Streaming Message

Pipeline 1 Diagram

Deep Dive Runner: Overview

Currently, runners exist for JavaScript, JVM, and Python.

Deep Dive Runner: Responsibilities

  1. ▶️ Start from command line
    Runners can be launched as standalone processes.
  2. 🔌 Connect with orchestrator via gRPC
    Handle control messages and data exchange.
  3. ⚙️ Manage processors
    Start, stop, and monitor the processors they host and forward log messages.

Deep Dive Runner: Example

Example of a runner configuration in RDF (Turtle):

Deep Dive Processor: Overview

Deep Dive: Centralized Logging

JavaScript Ecosystems

Java Ecosystems

Python ecosystem

Hands-On

  1. 🌦 HTTP Fetch → Log contents
  2. 🔄️ Weather API → RDF → Log
  3. 🧩️ Weather API → RDF → Validation → Log
  4. 🚀️ Weather API → RDF → Validation → Publish → Log
  5. 🤖 Implement your own ML processor in Python
  6. ✅ Weather API → RDF → Translation → Validation → Publish → Log

Follow allong in the GitHub repository.
All tasks are in the README. Each branch is a solution to a task!
open.gent/r/iswc-rdfc-repo

QR code for tutorial guideline repository

What we already did this morning

Pipeline 1 Diagram

Follow along on branch task-1, or jump to the slides for a recap.

What we already did this morning


@prefix rdfc: <https://w3id.org/rdf-connect#>.
@prefix owl: <http://www.w3.org/2002/07/owl#>.
@prefix ex: <http://example.org/>.
      

Recap: Running the pipeline

Start the orchestrator with the configuration file:


npx rdfc pipeline.ttl
  
✅ You should see the HTTP contents being logged.

Hands-On: Pipeline

  1. 🌦 HTTP Fetch → Log contents
  2. 🔄️ Weather API → RDF → Log
  3. 🧩️ Weather API → RDF → Validation → Log
  4. 🚀️ Weather API → RDF → Validation → Publish → Log

Follow allong in the GitHub repository.
All tasks are in the README. Each branch is a solution to a task!
open.gent/r/iswc-rdfc-repo

QR code for tutorial guideline repository

Pipeline design: Weather KG

Pipeline 2 Diagram

Weather KG Pipeline: JavaScript Setup

Install the additionally required processors:


npm install @rdfc/file-utils-processors-ts
npm install @rdfc/shacl-processor-ts
npm install @rdfc/sparql-ingest-processor-ts
  

Weather KG Pipeline: Java Setup

Add the required dependency to your Gradle build file:


plugins { id 'java' }
repositories {
    mavenCentral()
    maven { url = uri("https://jitpack.io") }
}
dependencies {
    implementation("com.github.rdf-connect:rml-processor-jvm:master-SNAPSHOT:all")
}
tasks.register('copyPlugins', Copy) {
    from configurations.runtimeClasspath
    into "$buildDir/plugins"
}
    

Install jars with
gradle copyPlugins.

The jvm-runner downloads the jvm-runner itselve, no installing required.

If you do not want to use Gradle, you can also download the jars manually and put them in the build/plugins/ folder.


       wget 'jitpack.io/com/github/rdf-connect/rml-processor-jvm/master-SNAPSHOT/rml-processor-jvm-master-SNAPSHOT-all.jar'
    

Run the Weather KG Pipeline

Start the orchestrator with the configuration file:


npx rdfc pipeline.ttl
  
Don't forget to start a SPARQL endpoint!
→ We provide a docker-compose.yml with a Virtuoso instance configured.

Now try it yourself!

🛠️ Follow Part 1 in the repo (up to Task 4)

⏰ You have time till lunch

🙋 Ask questions!

Pipeline 2 Diagram
QR code for tutorial guideline repository
open.gent/r/iswc-rdfc-repo

Lunch break! 🍣

Agenda

Hands-On

  1. 🌦 HTTP Fetch → Log contents
  2. 🔄️ Weather API → RDF → Log
  3. hands-on-2
  4. 🧩️ Weather API → RDF → Validation → Log
  5. 🚀️ Weather API → RDF → Validation → Publish → Log
  6. 🤖 Implement your own ML processor in Python
  7. Weather API → RDF → Translation → Validation → Publish → Log

Follow allong in the GitHub repository.
All tasks are in the README. Each branch is a solution to a task!
open.gent/r/iswc-rdfc-repo

QR code for tutorial guideline repository

Implement a processor

  1. Define the processor and its SHACL shape in processor.ttl
  2. Implement the processor class
    • Extend the abstract class
    • Implement the init, transform, and produce methods
    • Use Readers & Writers to handle messages
  3. Publish the processor
  4. Use the processor in a pipeline

Define the processor in processor.ttl


@prefix rdfc: <https://w3id.org/rdf-connect#>.
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.
      
Python logo
JS/TS logo
Java logo

Define the processor in processor.ttl

                
@prefix rdfc: <https://w3id.org/rdf-connect#>.
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix xsd: <http://www.w3.org/2001/XMLSchema#>.
                
            

sh:targetClass links back to the IRI used on previous slide

Custom class for readers and writers: rdfc:Reader & rdfc:Writer

sh:name links to variable name in code

sh:path links to property in pipeline.ttl

Optional and multiple arguments with sh:minCount and sh:maxCount
(sh:maxCount != 1 results in a list of arguments)

Implement the processor class

Python logo
JS/TS logo
Java logo

Start from a processor template repository

TypeScript
https://github.com/rdf-connect/template-processor-ts

Python
https://github.com/rdf-connect/template-processor-py

Java
https://github.com/rdf-connect/template-processor-jvm

Implement a Python processor

  1. In the transform method: consume the reader channel
  2. Parse the input using rdflib
  3. Identify language-tagged literals with @ja
  4. Translate them to English using a ML model
  5. Emit both original and translated triples to the writer channel
  6. Define the processor in processor.ttl
processor flow

Add the Python runner

Set up the pyproject.toml for your pipeline

Configure specific Python version to have a deterministic path to the dependencies.

Add the rdfc-runner as a dependency.

Add the Python runner

Install the runner:


uv add rdfc_runner
  

Import definition via owl:imports


<> owl:imports <./.venv/lib/python3.13/site-packages/rdfc_runner/index.ttl>.
    

Attach it to the pipeline declaration


<> a rdfc:Pipeline;
   rdfc:consistsOf [...], [
       rdfc:instantiates rdfc:PyRunner;
       rdfc:processor <translator>
   ].
    

Add your translation processor

Install your local processor after hatch build


uv add ../processor/dist/rdfc_translation_processor.tar.gz
    

Import definition via owl:imports


<> owl:imports <./.venv/lib/python3.13/site-packages/rdfc_translation_processor/processor.ttl>.
    

Define the channel


<translated> a rdfc:Reader, rdfc:Writer.
    

Define the processor instantiation


<translator> a rdfc:TranslationProcessor;
    rdfc:reader <rdf>;
    rdfc:writer <translated>;
    ... .
    

Hands-On: Processor

  1. 🤖 Implement your own ML processor in Python
  2. ✅ Weather API → RDF → Translation → Validation → Publish → Log

Follow allong in the GitHub repository.
All tasks are in the README. Each branch is a solution to a task!
open.gent/r/iswc-rdfc-repo

QR code for tutorial guideline repository

Pipeline design: Weather KG With ML

Pipeline 3 Diagram

Now try it yourself!

🛠️ Follow Part 2 in the repo (Task 5 - 7)

🙋 Ask questions!

Pipeline 2 Diagram
QR code for tutorial guideline repository
open.gent/r/iswc-rdfc-repo

What is next for RDF-Connect?

We envision the following development and research roads:

Community-driven Metadata innitiatives

Several innitiatives exist for the standardization of workflow metadata

Workflows Community logo WRRO-Crate logo OpenMetadata logo OpenLineage logo

Workflows Community Initiative (WCI)

The WCI aims to foster collaboration and standardization in the field of scientific workflow management. It provides a common framework for describing workflows, their components, and execution metadata. By aligning RDF-Connect with WCI standards, we can enhance interoperability and facilitate the sharing of workflow metadata across different platforms and tools.

Workflows Community terminology Workflows Community registries

WorkflowHub

WorkflowHub is a platform for sharing and discovering scientific workflows. It provides a repository for workflow definitions, metadata, and execution records.

WorkflowHub

Dockstore

Dockstore is a platform for sharing reusable and scalable analytical tools and workflows. It supports a variety of workflow languages and provides features for versioning, collaboration, and execution tracking.

Dockstore

Workflow Run RO-Crates Profile

The Workflow Run RO-Crates Profile extends the RO-Crate specification to better support the description of workflow executions and their associated metadata. A semantic alignment between RDF-Connect and the Workflow Run RO-Crates Profile will enable seamless integration of workflow execution metadata into RO-Crates, facilitating better reproducibility and sharing of scientific workflows.

Workflow Run RO-Crate model

OpenMetadata platform

OpenMetadata is an open-source metadata management platform that provides a unified view of data assets across an organization. It offers features for data discovery, lineage tracking, and governance.

OpenMetadata platform OpenMetadata schemas

OpenLineage framework

OpenLineage is an open standard for metadata and lineage collection designed to instrument data pipelines and applications.

OpenLineage framework

Live monitoring capabilites

Integration of RDF-Connect with systems such as Prometheus. This will allow real-time tracking of pipeline execution, resource utilization, and performance metrics, enabling users to monitor and optimize their workflows effectively.

Prometheus architecture Prometheus metrics

Support for other programming languages

RDF-Connect extension to support other languages such as:

Rust Go C++

Support for federated and cloud-native execution

Remote execution of RDF-Connect Runners beyond CLI. For instance within EOSC (European Open Science Cloud) nodes.

EOSC Node

Automation of workflow development and management

Leverage generative AI capabilities to automate Processor and Pipeline development. Also, provideUI-based pipeline management.

RDF-Connect AI RDF-Connect UI

Optimization of data flows

Zero-copy data movement: Integration with Apache Arrow (where possible) to optimize data flow performance and efficiency.

Genomics workflow with Apache Arrow R Workflow with Apache Arrow ML Workflow with Apache Arrow

Coffee break! ☕

Let's Code Something Together

Goal: build a pipeline using software that’s going to be introduced later during ISWC.
Each of you can contribute by wrapping new or existing software as a processor compatible with RDF-Connect.

Once we have a few, we’ll connect them into a shared pipeline and see what we can create together!

Do you know of any software we could try? We’ve spotted some ideas already: Jelly, RDFMutate, pycottas, rdf2vecgpu, or we could even generate data cubes.

What Could Our Pipeline Do?

We already have plenty of ideas for individual processors — but what about the bigger picture? What could a complete pipeline actually achieve?

Let’s brainstorm: combine analysis, transformation, or visualization — something fun and meaningful that shows what RDF-Connect can do when we link our work together.

Hackathon Flow 💡

  1. Pick a tool or idea — something new, weird, or cool you want to plug in.
  2. Wrap it as a processor — make it talk RDF-Connect style.
  3. Test it! — see if it runs standalone, maybe share your results.
  4. Connect it — link your processor into our shared pipeline.
  5. Celebrate 🎉 — watch the data flow and see what we built together!

👉 Don’t worry about perfection — the goal is to explore, experiment, and have fun connecting ideas.

Please create a GitHub repository for your processor and let us link them together in a pipeline.

Building Streaming and Cross-Environment Data Processing
Pipelines with RDF-Connect

Thank you!

We sincerely hope you enjoyed this tutorial
and found it valuable.

QR code to the RDF-Connect homepage
w3id.org/rdf-connect