skills/data-engineering/stream-processing/SKILL.md
Process continuous data streams in real-time using frameworks like Kafka and Flink for efficient data engineering.
npx skillsauth add alphaonedev/openclaw-graph stream-processingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
This skill enables real-time processing of continuous data streams using frameworks like Kafka, Flink, and Apache Spark. It's designed for scenarios requiring immediate data ingestion, transformation, and analysis to support data engineering pipelines.
Use this skill for high-volume data sources like IoT sensors, log files, or financial transactions that need real-time analytics. Apply it when batch processing is insufficient, such as monitoring system metrics, detecting anomalies, or updating dashboards dynamically.
kafka-console-producer --topic my-topic --broker-list localhost:9092 to send messages. For consumption: kafka-console-consumer --topic my-topic --from-beginning --bootstrap-server localhost:9092.flink run -c com.example.StreamJob /path/to/jar --input kafka-topic --output file:///output to process streams. Use Flink's REST API at http://localhost:8081/jobs/overview for monitoring.val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").load(). Then apply transformations: stream.selectExpr("CAST(value AS STRING)").writeStream.outputMode("append").format("console").start().key.serializer=org.apache.kafka.common.serialization.StringSerializer for producers. Flink uses YAML for configurations, e.g., execution.checkpointing.interval: 1min.Integrate Kafka as a source for Flink by adding dependencies in your Flink job (e.g., via Maven: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId></dependency>). For authentication, set environment variables like $KAFKA_API_KEY in your producer script: export KAFKA_API_KEY=your_key; kafka-console-producer --broker-list localhost:9092 --producer.config /path/to/config.properties. Link Spark with Kafka using Spark's built-in connectors, ensuring cluster compatibility (e.g., Spark 3.x with Kafka 2.8+). For external services, use API keys via env vars, e.g., $SPARK_MASTER_URL for connecting to a Spark cluster.
Handle Kafka connection errors by implementing retries in producers, e.g., using a loop with exponential backoff: try { producer.send(record) } catch (Exception e) { Thread.sleep(2000 * attempts); }. In Flink, enable restart strategies with env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))) to recover from task failures. For Spark, use checkpointing in streaming queries: writeStream.option("checkpointLocation", "/path/to/checkpoints").start() to restore state on failures. Log errors with structured formats, e.g., via SLF4J, and monitor with tools like Prometheus for real-time alerts.
kafka-console-producer --topic logs --broker-list localhost:9092. Then run a Flink job: flink run -c com.example.LogProcessor /path/to/jar --input logs. The job filters errors: env.addSource(new FlinkKafkaConsumer<>("logs", ...)).filter(line -> line.contains("ERROR")).print().val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "sensors").load(). Aggregate data: df.groupBy(window($"timestamp", "1 minute")).avg("value").writeStream.format("console").start(). This processes IoT sensor streams for minute-level averages.tools
Root web development: project structure, tooling selection, deployment decisions
development
WebAssembly: Rust/Go/C to WASM, wasm-bindgen, Emscripten, WASM Component Model
development
Vue 3: Composition API script setup, Pinia, Vue Router 4, SFCs, Vite, Nuxt 3
tools
Tailwind CSS 4: utility classes, config, JIT, arbitrary values, darkMode, plugins, shadcn/ui