Building Streaming and Cross-Environment Data Processing Pipelines with RDF-Connect

Ieben Smessaert, Arthur Vercruysse, Julián Rojas Meléndez, Pieter Colpaert,

SEMANTiCS 2025, September 3-5, 2025

Building Streaming and Cross-Environment Data Processing Pipelines with RDF-Connect

Ghent University – imec – IDLab, Belgium

Tutorial website: open.gent/r/semantics-RDFC

SEMANTiCS 2025, September 3, 2025

Agenda

Tutorial overview

Part 1: Introduction to RDF-Connect

You will learn the motivation behind RDF-Connect, its conceptual model and architecture, by following a running example of a knowledge graph lifecycle pipeline.

Part 2: Hands-on

You will implement a ML-based processor and integrate it in the knowledge graph lifecycle pipeline.

KG lifecycle pipeline

A pipeline of a knowledge graph lifecycle process, where weather data (from an Austrian API service) will be collected, transformed into RDF, validated against a SHACL shape, enriched and published on a RDF graph store.

Assembling our very first pipeline

Pipeline 1 Diagram

We want to fetch the GeoSphere weather data API,
and log its contents to the console.

We provide a step-by-step
tutorial code repository

QR code for tutorial guideline repository
open.gent/r/semantics-repo

Step 1: Setup

Install the orchestrator, runner, and processors:


npm install @rdfc/orchestrator-js
npm install @rdfc/js-runner
npm install @rdfc/http-utils-processor-ts
npm install @rdfc/log-processor-ts
  
🛠️ The log processor is also available in Python.
You can swap it to see cross-language interoperability in action!

Step 2: Initialize pipeline.ttl

Add the prefixes rdfc, owl, ex


@prefix rdfc: <https://w3id.org/rdf-connect#>.
@prefix owl: <http://www.w3.org/2002/07/owl#>.
@prefix ex: <http://example.org/>.
    

Declare the RDF-Connect pipeline


<> a rdfc:Pipeline.
    

Step 3: Add the rdfc:NodeRunner

Import definition via owl:imports


<> owl:imports <./node_modules/@rdfc/js-runner/index.ttl>.
    

Attach it to the pipeline declaration


<> a rdfc:Pipeline;
   rdfc:consistsOf [
       rdfc:instantiates rdfc:NodeRunner;
   ].
    

Step 4: Add the rdfc:HttpFetch processor

Import definition via owl:imports


<> owl:imports <./node_modules/@rdfc/http-utils-processor-ts/processors.ttl>.
    

Define the channel


<json> a rdfc:Reader, rdfc:Writer.
    

Define the processor instantiation


<fetcher> a rdfc:HttpFetch;
    rdfc:url "https://dataset.api.hub.geosphere.at/v1/station/current/tawes-v1-10min?parameters=TL,RR&station_ids=11035";
    rdfc:writer <json>.
    

Attach the processor to the runner


<> a rdfc:Pipeline;
   rdfc:consistsOf [
       rdfc:instantiates rdfc:NodeRunner;
       rdfc:consistsOf <fetcher> ].
    

Step 5: Add the rdfc:LogProcessorJs

Import definition via owl:imports


<> owl:imports <./node_modules/@rdfc/log-processor-ts/processor.ttl>.
    

Define the processor instantiation


<logger> a rdfc:LogProcessorJs;
    rdfc:reader <json>;
    rdfc:level "info";
    rdfc:label "output".
    

Attach the processor to the runner


[ rdfc:instantiates rdfc:NodeRunner;
  rdfc:consistsOf <fetcher>, <logger> ].
    

Step 6: Run the pipeline


npx rdfc pipeline.ttl
# or with debug logging:
LOG_LEVEL=debug npx rdfc pipeline.ttl
  

✅ Solution available in task-1 branch.

Now try task 0 & 1 yourself!

  1. Setup
  2. Initialize pipeline.ttl
  3. Add the rdfc:NodeRunner
  4. Add the rdfc:HttpFetch processor
  5. Add the rdfc:LogProcessorJs
  6. Run the pipeline
QR code for tutorial guideline repository
open.gent/r/semantics-repo

Data processing pipelines are crucial in modern data-centric systems

They enable the transformation, integration, and analysis of data from and to various sources and targets.

Pipelines are usually composed of multiple complex tasks

However, building, managing and reusing these pipelines can be complex and challenging.

Pipelines are ubiqutous in real world systems

Stream processing computational paradigm for continuous and dynamic data systems

Traditional batch processing systems suffer from latency problems due to the need to collect input data into batches before it can be processed. — Isah, H., et al., A Survey of Distributed Data Stream Processing Frameworks, IEEE Access, 2019

Current real-world data systems often require real-time or near-real-time processing of dynamic data. Stream processing allows for the continuous ingestion and processing of data as it arrives, enabling timely insights and actions.

Cross-environment execution: choosing the best of all worlds

The ability to execute applications written in different programming languages in an integrated manner offers several advantages:

Polyglot Architecture

Declarative, reusable and provenance-aware data processing

Scientists want to use provenance data to answer questions such as: Which data items were involved in the generation of a given partial result? or Did this actor employ outputs from one of these two other actors? — Cuevas-Vicentin, V., et al., Scientific Workflows and Provenance: Introduction and Research Opportunities, Datenbank Spektrum, 2012
Provenance is instrumental to activities such as traceability, reproducibility, accountability, and quality assessment. — Herschel, M., et al., A Survey on Provenance: What for? What form? What from?, VLDB, 2017
Prospective provenance—the execution plan—is essentially the workflow itself: it includes a machine-readable specification with the processing steps to be performed and the data and software dependencies to carry out each computation. — Simone, L., et al., Recording provenance of workflow runs with RO-Crate, PLoS ONE, 2024

RDF-Connect requirements overview

RDF-Connect requirements overview

Aren't there like a million pipeline frameworks already?

Related work

Aren't there like a million pipeline frameworks already?

Aren't there like a million pipeline frameworks already?

Existing pipeline frameworks
https://github.com/common-workflow-language/common-workflow-language/wiki/Existing-Workflow-systems https://github.com/pditommaso/awesome-pipeline

A worthy mention:
Common Workflow Language

Common Workflow Language Logo

Common Workflow Language (CWL) is an open standard for describing how to run command line tools and connect them to create workflows.

Common Workflow Language Description

How is RDF-Connect different from CWL?

Feature RDF-Connect CWL
Streaming Supports both batch and streaming via gRPC streams Primarily batch-oriented, although implementation-dependent streaming can be supported (e.g,. using named pipes)
Polyglot Supports any language through an add-in libraries approach Can accomodate polylingual workflows via POSIX CLI interfaces
Provenance Built-in prospective and retrospective provenance tracking based on PROV-O Provenance extension available (CWLProv) based on PROV-O
Schema expressivity Full SHACL-based expressivity Set of defined types and limited constraint definitions

Running example: the goal

  1. Retrieve JSON data from GeoSphere weather API
  2. Data is transformed into RDF using RML
  3. Translate language-typed literals to another language
  4. RDF data is validated against a shape
  5. RDF data is published through a triple store
Running example - the goal

Pipeline design of running example

Pipeline design architecture

Pipeline File Structure

A pipeline is described in RDF configuration files:

  • 🔗 Channels
    Define how data flows between processors.
  • 📦 Runners
    Specify which runtime environments are needed.
  • ⚙️ Processors
    Tasks that run inside a runner.
Pipeline file structure

Deep Dive: SHACL Shapes

Each runner and processor comes with a SHACL shape.
These shapes serve as the glue of RDF-Connect:

Deep Dive: SHACL Shapes Example

Deep Dive Orchestrator: Overview

Deep Dive Orchestrator: Responsibilities

  1. 📜 Initialize Pipeline
    1. ▶️ Start runners
      Launch each runner using its configured command.
    2. ⚙️ Initialize processors
      Instruct runners to start the processors they manage.
  2. 📡 Route messages
    Deliver incoming messages to the correct runner / processor.
Pipeline 1 Diagram

Deep Dive Runner: Overview

Currently, runners exist for JavaScript, JVM, and Python.

Deep Dive Runner: Responsibilities

  1. ▶️ Start from command line
    Runners can be launched as standalone processes.
  2. 🔌 Connect with orchestrator via gRPC
    Handle control messages and data exchange.
  3. ⚙️ Manage processors
    Start, stop, and monitor the processors they host and forward log messages.

Deep Dive Processor: Overview

Deep Dive: Centralized Logging

JavaScript Ecosystems

Java Ecosystems

Python ecosystem

Finally! Lunch break 🍔

Hands-On

  1. 🌦 HTTP Fetch → Log contents
  2. 🔄️ Weather API → RDF → Log
  3. 🧩️ Weather API → RDF → Validation → Log
  4. 🚀️ Weather API → RDF → Validation → Publish → Log
  5. 🤖 Implement your own ML processor in Python
  6. ✅ Weather API → RDF → Translation → Validation → Publish → Log

Follow allong in the GitHub repository.
All tasks are in the README. Each branch is a solution to a task!
open.gent/r/semantics-repo

QR code for tutorial guideline repository

What we already did this morning

Pipeline 1 Diagram

Follow along on branch task-1, or jump to the slides for a recap.

What we already did this morning


@prefix rdfc: <https://w3id.org/rdf-connect#>.
@prefix owl: <http://www.w3.org/2002/07/owl#>.
@prefix ex: <http://example.org/>.
      

Recap: Running the pipeline

Start the orchestrator with the configuration file:


npx rdfc pipeline.ttl
  
✅ You should see the HTTP contents being logged.

Hands-On: Pipeline

  1. 🌦 HTTP Fetch → Log contents
  2. 🔄️ Weather API → RDF → Log
  3. 🧩️ Weather API → RDF → Validation → Log
  4. 🚀️ Weather API → RDF → Validation → Publish → Log

Follow allong in the GitHub repository.
All tasks are in the README. Each branch is a solution to a task!
open.gent/r/semantics-repo

QR code for tutorial guideline repository

Pipeline design: Weather KG

Pipeline 2 Diagram

Weather KG Pipeline: JavaScript Setup

Install the additionally required processors:


npm install @rdfc/file-utils-processors-ts
npm install @rdfc/shacl-processor-ts
npm install @rdfc/sparql-ingest-processor-ts
  

Weather KG Pipeline: Java Setup

Add the required dependency to your Gradle build file:


plugins { id 'java' }
repositories {
    mavenCentral()
    maven { url = uri("https://jitpack.io") }
}
dependencies {
    implementation("com.github.rdf-connect:rml-processor-jvm:master-SNAPSHOT:all")
}
tasks.register('copyPlugins', Copy) {
    from configurations.runtimeClasspath
    into "$buildDir/plugins"
}
    

Install jars with
gradle copyPlugins.

The jvm-runner downloads the jvm-runner itselve, no installing required.

If you do not want to use Gradle, you can also download the jars manually and put them in the build/plugins/ folder.


       wget 'jitpack.io/com/github/rdf-connect/rml-processor-jvm/master-SNAPSHOT/rml-processor-jvm-master-SNAPSHOT-all.jar'
    

Run the Weather KG Pipeline

Start the orchestrator with the configuration file:


npx rdfc pipeline.ttl
  
Don't forget to start a SPARQL endpoint!
→ We provide a docker-compose.yml with a Virtuoso instance configured.

Now try it yourself!

🛠️ Follow Part 1 in the repo (up to Task 4)

⏰ We continue with the presentation at 13:45

🙋 Ask questions!

Pipeline 2 Diagram
QR code for tutorial guideline repository
open.gent/r/semantics-repo

Hands-On

  1. 🌦 HTTP Fetch → Log contents
  2. 🔄️ Weather API → RDF → Log
  3. 🧩️ Weather API → RDF → Validation → Log
  4. 🚀️ Weather API → RDF → Validation → Publish → Log
  5. 🤖 Implement your own ML processor in Python
  6. Weather API → RDF → Translation → Validation → Publish → Log

Follow allong in the GitHub repository.
All tasks are in the README. Each branch is a solution to a task!
open.gent/r/semantics-repo

QR code for tutorial guideline repository

Implement a processor

  1. Define the processor and its SHACL shape in processor.ttl
  2. Implement the processor class
    • Extend the abstract class
    • Implement the init, transform, and produce methods
    • Use Readers & Writers to handle messages
  3. Publish the processor
  4. Use the processor in a pipeline

Define the processor in processor.ttl


@prefix rdfc: <https://w3id.org/rdf-connect#>.
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.
      
Python logo
JS/TS logo
Java logo

Define the processor in processor.ttl

                
@prefix rdfc: <https://w3id.org/rdf-connect#>.
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix xsd: <http://www.w3.org/2001/XMLSchema#>.
                
            

sh:targetClass links back to the IRI used on previous slide

Custom class for readers and writers: rdfc:Reader & rdfc:Writer

sh:name links to variable name in code

sh:path links to property in pipeline.ttl

Optional and multiple arguments with sh:minCount and sh:maxCount
(sh:maxCount != 1 results in a list of arguments)

Implement the processor class

Python logo
JS/TS logo
Java logo

Start from a processor template repository

TypeScript
https://github.com/rdf-connect/template-processor-ts

Python
https://github.com/rdf-connect/template-processor-py

Java
https://github.com/rdf-connect/template-processor-jvm

Implement a Python processor

  1. In the transform method: consume the reader channel
  2. Parse the input using rdflib
  3. Identify language-tagged literals with @de
  4. Translate them to English using a ML model
  5. Emit both original and translated triples to the writer channel
  6. Define the processor in processor.ttl
processor flow

Add the Python runner

Set up the pyproject.toml for your pipeline

Configure specific Python version to have a deterministic path to the dependencies.

Add the rdfc-runner as a dependency.

Add the Python runner

Install the runner:


uv add rdfc_runner
  

Import definition via owl:imports


<> owl:imports <./.venv/lib/python3.13/site-packages/rdfc_runner/index.ttl>.
    

Attach it to the pipeline declaration


<> a rdfc:Pipeline;
   rdfc:consistsOf [...], [
       rdfc:instantiates rdfc:PyRunner;
       rdfc:processor <translator>
   ].
    

Add your translation processor

Install your local processor after hatch build


uv add ../processor/dist/rdfc_translation_processor.tar.gz
    

Import definition via owl:imports


<> owl:imports <./.venv/lib/python3.13/site-packages/rdfc_translation_processor/processor.ttl>.
    

Define the channel


<translated> a rdfc:Reader, rdfc:Writer.
    

Define the processor instantiation


<translator> a rdfc:TranslationProcessor;
    rdfc:reader <rdf>;
    rdfc:writer <translated>;
    ... .
    

Hands-On: Processor

  1. 🤖 Implement your own ML processor in Python
  2. ✅ Weather API → RDF → Translation → Validation → Publish → Log

Follow allong in the GitHub repository.
All tasks are in the README. Each branch is a solution to a task!
open.gent/r/semantics-repo

QR code for tutorial guideline repository

Pipeline design: Weather KG With ML

Pipeline 3 Diagram

Now try it yourself!

🛠️ Follow Part 2 in the repo (Task 5 - 7)

🙋 Ask questions!

Pipeline 2 Diagram
QR code for tutorial guideline repository
open.gent/r/semantics-repo

Building Streaming and Cross-Environment Data Processing
Pipelines with RDF-Connect

Thank you!

We sincerely hope you enjoyed this tutorial
and found it valuable.

QR code to the RDF-Connect homepage
w3id.org/rdf-connect