<--- Back to all resources
Flink SQL User-Defined Functions (UDFs): Extending SQL with Custom Logic
Learn how to create and use User-Defined Functions in Flink SQL - scalar functions, table functions, and aggregate functions for custom stream processing logic.
Apache Flink SQL ships with hundreds of built-in functions for string manipulation, arithmetic, date handling, conditional logic, and more. For a wide range of stream processing tasks, those built-in functions are all you need. But production data pipelines inevitably encounter requirements that no standard function library can anticipate: proprietary serialization formats, company-specific business rules, third-party API lookups, or domain-specific calculations that simply do not exist as built-in operators.
This is where User-Defined Functions (UDFs) come in. UDFs let you write custom logic in Java, Scala, or Python and expose it to Flink SQL as if it were a native function. Once registered, a UDF can be called in any SELECT, WHERE, GROUP BY, or JOIN clause, giving you the full expressiveness of a general-purpose programming language without leaving the declarative SQL approach.
Scalar Functions: One-to-One Transformations
A scalar function accepts one or more input values from a single row and returns exactly one output value. This is the most common type of UDF and covers use cases like custom parsing, hashing, formatting, and enrichment.
Java Example
In Java, you create a scalar function by extending ScalarFunction and implementing an eval method. Flink uses reflection to discover the method signature.
import org.apache.flink.table.functions.ScalarFunction;
public class MaskEmail extends ScalarFunction {
public String eval(String email) {
if (email == null || !email.contains("@")) {
return email;
}
String[] parts = email.split("@");
String masked = parts[0].charAt(0)
+ "***"
+ "@" + parts[1];
return masked;
}
}
Python Example
In PyFlink, the @udf decorator turns a regular Python function into a Flink scalar UDF.
from pyflink.table.udf import udf
from pyflink.table import DataTypes
@udf(result_type=DataTypes.STRING())
def mask_email(email: str) -> str:
if email is None or "@" not in email:
return email
local, domain = email.split("@", 1)
return f"{local[0]}***@{domain}"
Registration and Usage
After packaging your class (more on that below), register the function and call it like any built-in:
CREATE FUNCTION mask_email AS 'com.example.MaskEmail';
SELECT
user_id,
mask_email(email) AS masked_email,
signup_date
FROM user_events;
The function integrates directly into Flink’s query optimizer. It can be pushed down into projections, used in filter predicates, or combined with other functions in complex expressions.
Table Functions: One-to-Many Row Generation
Sometimes a single input row needs to produce multiple output rows. A table function accepts scalar input and emits zero or more rows, each with one or more columns. Classic use cases include exploding JSON arrays, splitting delimited strings, or generating date ranges.
In Java, you extend TableFunction<T> and call collect(T) for each output row inside your eval method.
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
public class ExplodeJsonArray extends TableFunction<Row> {
public void eval(String jsonArray) {
// Parse the JSON array and emit one row per element
for (JsonNode element : parseArray(jsonArray)) {
collect(Row.of(
element.get("key").asText(),
element.get("value").asText()
));
}
}
}
LATERAL TABLE Syntax
Table functions are invoked using LATERAL TABLE in a cross join or left join:
CREATE FUNCTION explode_json AS 'com.example.ExplodeJsonArray';
SELECT
o.order_id,
item.key,
item.value
FROM orders AS o,
LATERAL TABLE(explode_json(o.items_json)) AS item(key, value);
A LEFT JOIN LATERAL TABLE(...) variant preserves the parent row even when the table function emits zero results, which is important when you cannot guarantee every input contains valid data.
Aggregate Functions: Custom Multi-Row Aggregation
Aggregate functions consume many input rows and produce a single output value, just like SUM or COUNT. They are the most complex UDF type because they must support incremental accumulation and, for distributed execution, merging partial results from different partitions.
You implement an aggregate function by extending AggregateFunction<T, ACC>, where T is the result type and ACC is the accumulator type.
import org.apache.flink.table.functions.AggregateFunction;
public class WeightedAvg
extends AggregateFunction<Double, WeightedAvg.Acc> {
public static class Acc {
public double weightedSum = 0;
public double weightTotal = 0;
}
public Acc createAccumulator() {
return new Acc();
}
public void accumulate(Acc acc, Double value, Double weight) {
acc.weightedSum += value * weight;
acc.weightTotal += weight;
}
public void merge(Acc acc, Iterable<Acc> others) {
for (Acc other : others) {
acc.weightedSum += other.weightedSum;
acc.weightTotal += other.weightTotal;
}
}
public Double getValue(Acc acc) {
return acc.weightTotal == 0
? null
: acc.weightedSum / acc.weightTotal;
}
}
The merge method is important. In a distributed system like Flink, data is split across parallel subtasks. Each subtask accumulates its own partial result, and Flink calls merge to combine them. Without a correct merge implementation, your aggregate will produce incorrect results in parallel or session-window scenarios.
CREATE FUNCTION weighted_avg AS 'com.example.WeightedAvg';
SELECT
product_category,
weighted_avg(price, quantity) AS avg_price
FROM sales
GROUP BY product_category;
Practical UDF Examples
Beyond the patterns above, here are several real-world UDFs that appear frequently in production pipelines.
Custom JSON Path Extraction
While Flink provides JSON_VALUE, some pipelines need to extract deeply nested fields with fallback logic or custom null handling that the built-in function does not support.
public class SafeJsonExtract extends ScalarFunction {
public String eval(String json, String path, String defaultValue) {
try {
String result = JsonPath.read(json, path);
return result != null ? result : defaultValue;
} catch (Exception e) {
return defaultValue;
}
}
}
PII Masking and Hashing
Compliance requirements often demand that personally identifiable information be hashed or masked before it leaves the stream processing layer. A SHA-256 hashing UDF keeps raw PII out of downstream systems.
public class Sha256Hash extends ScalarFunction {
public String eval(String input) {
if (input == null) return null;
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(
input.getBytes(StandardCharsets.UTF_8));
return Hex.encodeHexString(hash);
}
}
IP Geolocation Lookup
Enriching clickstream data with country or city derived from an IP address is a common real-time analytics requirement. A UDF backed by a local MaxMind GeoIP database can resolve this without an external API call on every row.
public class IpToCountry extends ScalarFunction {
private transient DatabaseReader reader;
@Override
public void open(FunctionContext ctx) throws Exception {
reader = new DatabaseReader.Builder(
new File("/opt/flink/geoip/GeoLite2-Country.mmdb"))
.build();
}
public String eval(String ip) {
try {
return reader.country(
InetAddress.getByName(ip))
.getCountry().getName();
} catch (Exception e) {
return "Unknown";
}
}
}
Notice the open method. Flink calls it once when the function instance is created on each TaskManager, making it the right place to load heavyweight resources like database files or HTTP connection pools.
Custom Timestamp Parsing
Legacy systems often emit timestamps in non-standard formats. A parsing UDF normalizes them into a format Flink can use natively.
CREATE FUNCTION parse_ts AS 'com.example.ParseTimestamp';
SELECT
event_id,
parse_ts(raw_timestamp, 'dd/MMM/yyyy:HH:mm:ss Z')
AS event_time
FROM raw_events;
Java vs. Python UDFs: Performance Considerations
Java and Scala UDFs run inside the JVM alongside the Flink runtime. There is no serialization boundary between the function and the rest of the query execution pipeline, which means Java UDFs add negligible overhead per row.
Python UDFs, on the other hand, run in a separate Python worker process. Flink serializes each input batch to Apache Arrow columnar format, sends it to the Python process over a local socket, and deserializes the results back into the JVM. This architecture introduces measurable latency and CPU cost per batch.
For latency-sensitive, high-throughput pipelines, Java UDFs are the clear choice. Python UDFs make sense when you need access to Python-specific libraries (machine learning models via scikit-learn or PyTorch, NLP toolkits, specialized data science packages) and the throughput requirements are moderate. A common hybrid pattern is to prototype a UDF in Python during development and rewrite it in Java before promoting to production if performance testing reveals a bottleneck.
Type Inference and Explicit Type Hints
Flink tries to infer the return type of your UDF by inspecting the eval method signature. For simple types like String, Integer, or Boolean, this works automatically. For complex types, arrays, maps, or Row types, automatic inference often fails, and you need to provide explicit type hints.
In Java, override the getResultType method or use the @DataTypeHint annotation:
public class ParseCoordinates extends ScalarFunction {
@DataTypeHint("ROW<lat DOUBLE, lon DOUBLE>")
public Row eval(String coords) {
String[] parts = coords.split(",");
return Row.of(
Double.parseDouble(parts[0]),
Double.parseDouble(parts[1])
);
}
}
In Python, specify the result_type parameter in the @udf decorator, as shown in the earlier scalar function example. Getting type hints right prevents cryptic runtime errors and ensures that downstream operators receive data in the expected format.
Testing UDFs Before Deployment
A UDF is just a class with methods, which means standard unit testing practices apply. Test your eval method directly without starting a Flink cluster.
@Test
public void testMaskEmail() {
MaskEmail fn = new MaskEmail();
assertEquals("j***@example.com", fn.eval("john@example.com"));
assertNull(fn.eval(null));
assertEquals("invalid", fn.eval("invalid"));
}
For integration testing, Flink provides a MiniClusterExtension (JUnit 5) that spins up an in-process Flink cluster. You can register your UDF, execute a SQL query against a test table source, and assert the results. This catches issues that unit tests miss, such as serialization problems, type inference failures, and state management bugs in aggregate functions.
@Test
public void testUdfInSql() {
TableEnvironment tEnv = TableEnvironment.create(
EnvironmentSettings.inStreamingMode());
tEnv.createTemporaryFunction("mask_email", MaskEmail.class);
Table result = tEnv.sqlQuery(
"SELECT mask_email('alice@test.com')");
// collect and assert...
}
Deployment and Registration
JAR Packaging
Package your UDF classes and their dependencies into a single uber-JAR (fat JAR) using Maven Shade or Gradle Shadow. Exclude Flink’s own libraries to avoid classpath conflicts, since the cluster already provides them.
<!-- Maven Shade plugin excerpt -->
<artifactSet>
<excludes>
<exclude>org.apache.flink:*</exclude>
</excludes>
</artifactSet>
Registering Functions
Place the JAR on Flink’s classpath (the lib/ directory) or, on platforms that support it, upload it through a UI or API. Then register using CREATE FUNCTION:
-- Permanent function (persisted in catalog)
CREATE FUNCTION my_catalog.my_db.mask_email
AS 'com.example.MaskEmail';
-- Temporary function (session-scoped)
CREATE TEMPORARY FUNCTION mask_email
AS 'com.example.MaskEmail';
Permanent functions stored in a catalog like Hive Metastore or Flink’s built-in catalog survive session restarts and are visible to all jobs that share that catalog. Temporary functions exist only for the duration of the current session. In managed Flink platforms like Streamkap, UDF JARs are typically uploaded through the platform interface and registered as part of the pipeline configuration, removing the need to manage cluster classpaths manually.
When to Use UDFs vs. Built-In Functions
UDFs are powerful, but they come with maintenance costs: you own the code, the tests, the JAR versioning, and the upgrade path. Before writing a UDF, check whether a built-in function or a combination of built-in functions already solves the problem.
Use built-in functions when:
- The operation is a standard string, math, date, or conditional transformation.
- Flink’s JSON functions (
JSON_VALUE,JSON_QUERY,JSON_EXISTS) cover your extraction needs. - A
CASEexpression orCOALESCEcan express your conditional logic.
Use a UDF when:
- The logic needs an external library (GeoIP lookup, ML model inference, custom serialization).
- Business rules are complex enough that expressing them in pure SQL would be unreadable or unmaintainable.
- You need to call an external service or load a resource file during function initialization.
- The same custom logic is reused across many queries, making a named function cleaner than duplicated SQL expressions.
For teams running pipelines on Streamkap, many common data shaping tasks like field mapping, filtering, type conversion, and masking are handled by built-in transforms that require zero custom code. When those built-in options are not enough, custom Flink SQL with UDFs provides an escape hatch for arbitrarily complex logic, keeping your pipeline fully within the SQL layer while extending it exactly where you need to.
UDFs are one of the features that make Flink SQL a full stream processing language rather than a limited query interface. Learning the three function types, understanding the performance tradeoffs between Java and Python, and investing in proper testing and deployment practices will let your team handle virtually any transformation requirement without dropping out of SQL into a lower-level API.