tiles/flink-sql/skills/flink-sql/SKILL.md
Apache Flink SQL, Table API, and UDF development for both OSS Flink and Confluent Cloud. Use when: (1) Writing Flink SQL queries (windows, joins, aggregations, MATCH_RECOGNIZE), (2) Building Table API pipelines in Java or Python, (3) Creating UDFs (scalar, table functions) for Flink, (4) Deploying Flink jobs to Confluent Cloud, (5) Converting between DataStream and Table API, (6) Troubleshooting Flink SQL errors. Covers windowing, event-time processing, watermarks, state management, and Confluent-specific patterns.
npx skillsauth add gamussa/flink-sql-skill flink-sqlInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Stream processing with SQL semantics. Tables that change over time — think of them as movies, not photographs.
| Task | Approach | |------|----------| | Simple transformations | Flink SQL SELECT/WHERE/GROUP BY | | Windowed aggregation | Window TVFs (TUMBLE/HOP/SESSION/CUMULATE) | | Pattern detection | MATCH_RECOGNIZE | | Custom scalar logic | UDF (ScalarFunction) | | One-to-many expansion | UDTF (TableFunction) | | Stateful processing | Process Table Function (PTF) — see ptf-guide.md | | Join streams | Interval joins, temporal joins, lookup joins | | Deduplication | ROW_NUMBER() with OVER clause | | Top-N queries | ROW_NUMBER() OVER (ORDER BY ...) | | Row-to-row comparison | LAG/LEAD window functions | | Late data routing | CURRENT_WATERMARK() + Statement Sets | | Multi-level aggregation | Chained windows (fine → coarse) |
Traditional DB: Table = snapshot (photograph)
Flink: Table = changelog (movie)
Every INSERT/UPDATE/DELETE is an event in the changelog.
SQL queries become continuous — results update as data arrives.
Changelog modes:
Java Table API:
<dependency>
<groupId>io.confluent.flink</groupId>
<artifactId>confluent-flink-table-api-java-plugin</artifactId>
<version>2.1-8</version>
</dependency>
import io.confluent.flink.plugin.ConfluentSettings;
TableEnvironment env = TableEnvironment.create(
ConfluentSettings.fromGlobalVariables()
);
Python Table API:
pip install confluent-flink-table-api-python-plugin
from pyflink.table.confluent import ConfluentSettings
from pyflink.table import TableEnvironment
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)
Required environment variables:
export CLOUD_PROVIDER="aws"
export CLOUD_REGION="us-east-1"
export FLINK_API_KEY="<key>"
export FLINK_API_SECRET="<secret>"
export ORG_ID="<org-id>"
export ENV_ID="<env-id>"
export COMPUTE_POOL_ID="<pool-id>"
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment env = TableEnvironment.create(settings);
Tumbling window (fixed, non-overlapping):
SELECT
window_start, window_end,
COUNT(*) as cnt,
SUM(amount) as total
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;
Hopping window (overlapping):
SELECT window_start, window_end, AVG(price)
FROM TABLE(
HOP(TABLE trades, DESCRIPTOR(ts), INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;
Session window (gap-based):
SELECT window_start, window_end, user_id, COUNT(*)
FROM TABLE(
SESSION(TABLE clicks, DESCRIPTOR(click_time), INTERVAL '30' MINUTE)
)
GROUP BY window_start, window_end, user_id;
Interval join (time-bounded):
SELECT o.*, s.ship_time
FROM orders o, shipments s
WHERE o.order_id = s.order_id
AND s.ship_time BETWEEN o.order_time AND o.order_time + INTERVAL '4' HOUR;
Temporal join (point-in-time lookup):
SELECT o.*, r.rate
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;
Lookup join (external table):
SELECT o.*, c.name
FROM orders o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time DESC) AS rn
FROM events
)
WHERE rn = 1;
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rn
FROM products
)
WHERE rn <= 10;
SELECT *
FROM orders
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(A.event_time) AS start_time,
LAST(B.event_time) AS end_time,
COUNT(A.amount) AS cnt
ONE ROW PER MATCH
PATTERN (A+ B)
DEFINE
A AS A.amount < 100,
B AS B.amount >= 100
);
Table orders = env.from("orders");
Table result = orders
.filter($("status").isEqual("completed"))
.select($("order_id"), $("amount"), $("customer_id"))
.groupBy($("customer_id"))
.select($("customer_id"), $("amount").sum().as("total"));
// SQL → Table
Table fromSql = env.sqlQuery("SELECT * FROM orders WHERE amount > 100");
// Table → SQL
env.createTemporaryView("filtered_orders", fromSql);
Table fromTable = env.sqlQuery("SELECT customer_id, SUM(amount) FROM filtered_orders GROUP BY customer_id");
// Print results (limited)
ConfluentTools.printMaterializedLimit(table, 100);
// Collect results
List<Row> rows = ConfluentTools.collectMaterializedLimit(table, 100);
// Statement lifecycle
TableResult result = env.executeSql("SELECT * FROM orders");
String statementName = ConfluentTools.getStatementName(result);
ConfluentTools.stopStatement(result);
TableDescriptor descriptor = ConfluentTableDescriptor.forManaged()
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("data", DataTypes.STRING())
.watermark("$rowtime", $("$rowtime").minus(lit(5).seconds()))
.build())
.build();
env.createTable("my_table", descriptor);
For UDF development patterns, templates, and deployment: See udf-guide.md
public class MyUpperCase extends ScalarFunction {
public String eval(String s) {
return s == null ? null : s.toUpperCase();
}
}
// Register and use
env.createTemporaryFunction("my_upper", MyUpperCase.class);
env.sqlQuery("SELECT my_upper(name) FROM users");
# Build JAR
mvn clean package
# Upload artifact
confluent flink artifact create my-udf \
--cloud aws --region us-east-1 \
--artifact-file target/my-udf-1.0.jar
# Register function
CREATE FUNCTION my_upper
AS 'com.example.MyUpperCase'
USING JAR 'confluent-artifact://cfa-xxxxx';
Declare watermark in DDL:
CREATE TABLE events (
event_id STRING,
event_time TIMESTAMP(3),
payload STRING,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (...);
Watermark strategies:
event_time - INTERVAL 'n' SECOND — bounded out-of-ordernessevent_time — strictly ascending (no late data)SOURCE_WATERMARK() — preserve source watermarks| Error | Cause | Fix |
|-------|-------|-----|
| Cannot resolve watermark | Missing watermark declaration | Add WATERMARK FOR col AS ... |
| Schema mismatch | Column types don't align | Check data types with DESCRIBE table |
| State too large | Unbounded aggregation | Add state TTL or use windows |
| Late data dropped | Watermark too aggressive | Increase watermark delay, or route late data with CURRENT_WATERMARK() (see sql-patterns.md) |
| UDF not found | Function not registered | Check catalog/database scope |
For detailed troubleshooting: See troubleshooting.md
development
Maintainer-only workflow for handling GitHub Secret Scanning alerts on OpenClaw. Use when Codex needs to triage, redact, clean up, and resolve secret leakage found in issue comments, issue bodies, PR comments, or other GitHub content.
development
Maintainer workflow for OpenClaw releases, prereleases, changelog release notes, and publish validation. Use when Codex needs to prepare or verify stable or beta release steps, align version naming, assemble release notes, check release auth requirements, or validate publish-time commands and artifacts.
development
Run, watch, debug, and extend OpenClaw QA testing with qa-lab and qa-channel. Use when Codex needs to execute the repo-backed QA suite, inspect live QA artifacts, debug failing scenarios, add new QA scenarios, or explain the OpenClaw QA workflow. Prefer the live OpenAI lane with regular openai/gpt-5.4 in fast mode; do not use gpt-5.4-pro or gpt-5.4-mini unless the user explicitly overrides that policy.
development
End-to-end Parallels smoke, upgrade, and rerun workflow for OpenClaw across macOS, Windows, and Linux guests. Use when Codex needs to run, rerun, debug, or interpret VM-based install, onboarding, gateway smoke tests, latest-release-to-main upgrade checks, fresh snapshot retests, or optional Discord roundtrip verification under Parallels.