Flink Version | Description | Source Code |
1.11 | Unsupported | - |
1.13 | Supported | |
1.14 | Supported | |
1.16 | Supported | |
CREATE TABLE PulsarTable (`user_id` bigint,`item_id` bigint,`behavior` STRING,`publish_time` TIMESTAMP_LTZ(3) METADATA FROM 'publish_time' VIRTUAL) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'user_behavior','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');
# Checking admin permissionsadmin_url=http://172.28.28.46:8080,172.28.28.29:8080,172.28.28.105:8080token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXXnamespace=public/defaultpulsar-admin --admin-url ${admin_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken topics list ${namespace}# Checking topic read and write permissionsservice_url=pulsar://172.28.28.46:6650,172.28.28.29:6650,172.28.28.105:6650token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXXnamespace=public/defaulttopic=xxxsubscription=yyypulsar-client --url ${service_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken consume -s ${subscription} -n 10 persistent://${namespace}/${topic} -p Earliest
Option | Required | Default Value | Data Type | Description |
connector | Yes | - | String | The connector to use. For Apache Pulsar, use 'pulsar' or 'upsert-pulsar'. |
admin-url | Yes | - | String | The Pulsar admin URL, such as http://my-broker.example.com:8080 or https://my-broker.example.com:8443. |
service-url | Yes | - | String | The URL of the Pulsar server. A Pulsar protocol URL is needed for a Pulsar client to connect to a Pulsar cluster. Example: pulsar://localhost:6650URL for multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652Production clusters are usually accessed via domains, such as pulsar://pulsar.us-west.example.com:6650.URL with TLS authentication enabled: pulsar+ssl://pulsar.us-west.example.com:6651 |
topics | Yes | - | String | The names of the Apache Pulsar topics from/to which data is read/written. This option can be one topic name or multiple topic names separated by ";", such as topic-1;topic-2.You can specify a set of topics or partitions or both, such as topic-a-partition-0;topic-a-partition-2;some-topic2.If both a topic and its partition are specified, only the topic will be used. For example, some-topic1;some-topic1-partition-0 is equivalent to some-topic1. |
pulsar.client.authPluginClassName | No | - | String | The authentication plugin class name. For token authentication, use org.apache.pulsar.client.impl.auth.AuthenticationToken. |
pulsar.client.authParams | No | - | String | The authentication parameters. The format for token authentication is token:xxxx. |
explicit | No | true | Boolean | Whether the table is an explicit Flink table, which is used by PulsarCatalog. For details, see the PulsarCatalog introduction below. |
key.fields | No | - | List<String> | The physical fields in the Flink table corresponding to the key fields in Pulsar messages. Note that these fields are unrelated to primary keys. |
key.format | No | - | String | The format used to deserialize and serialize the key part of Pulsar messages. |
format | No | - | String | The format used to deserialize and serialize the value part of Pulsar messages. Between format and value.format, you need to specify at least one. If you specify both, format will be applied. |
value.format | No | - | String | The format used to deserialize and serialize the value part of Pulsar messages. Between format and value.format, you need to specify at least one. If you specify both, format will be applied. |
sink.topic-routing-mode | No | round-robin | Enum | The topic routing policy. Valid values include round-robin and message-key-hash. The default value is round-robin. You can also configure a custom routing policy using the sink.custom-topic-router option. |
sink.custom-topic-router | No | - | String | The full class name for the custom topic routing policy. If you specify this option, do not set sink.topic-routing-mode. |
sink.message-delay-interval | No | 0 | Duration | The delay time for sending messages, such as 10ms, 1s, or 1min. This allows you to delay the consumption of messages. For details, see the Pulsar document Delayed message delivery. |
pulsar.sink.deliveryGuarantee | No | none | Enum | The message delivery guarantee for the Pulsar sink. Valid values include none, at-least-once, and exactly-once. To use exactly-once, your Pulsar cluster must support transactions. |
pulsar.sink.transactionTimeoutMillis | No | 10800000 | Long | The Pulsar transaction timeout period (milliseconds), which must be longer than the checkpoint interval. The default value is 10800000 (3 hours). |
pulsar.producer.batchingEnabled | No | false | Boolean | Whether to enable batch write. |
pulsar.producer.batchingMaxMessages | No | 1000 | Int | The maximum number of Pulsar messages that can be written at a time. |
source.start.message-id | No | - | String | The start of source consumption. It can be set to earliest, latest, or a specific message ID in the format ledgerId:entryId:partitionId (e.g., "12:2:-1"). |
source.start.publish-time | No | - | Long | The publishing time (Unix timestamp) of the starting message of source consumption. |
source.subscription-name | No | flink-sql-connector-pulsar-<RANDOM> | String | The Pulsar subscription name. The default value is flink-sql-connector-pulsar-<RANDOM>, where RANDOM is five random letters. |
source.subscription-type | No | Exclusive | Enum | The Pulsar subscription type. Valid values include Exclusive and Shared. For more information about subscription types, see Subscription types. |
source.stop.at-message-id | No | - | String | The end of source consumption. It can be set to earliest, latest, or a specific message ID in the format ledgerId:entryId:partitionId (e.g., "12:2:-1"). |
source.stop.at-publish-time | No | - | Long | The publishing time (Unix timestamp) of the ending message of source consumption. |
source.stop.after-message-id | No | - | String | The ID of the ending message of source consumption in the format ledgerId:entryId:partitionId (e.g., "12:2:-1"). The ending message will be consumed as well. |
pulsar.source.partitionDiscoveryIntervalMs | No | 30000 | Long | The interval (milliseconds) at which the Pulsar source checks for new partitions. If this is 0 or a negative value, partition detection will be disabled. |
pulsar.admin.requestRetries | No | 5 | Int | The number of retries in case of failure to call Pulsar admin RESTful APIs. |
pulsar.client.* | No | - | - | An arbitrary Pulsar client parameter. |
pulsar.admin.* | No | - | - | An arbitrary Pulsar admin parameter. |
pulsar.sink..* | No | - | - | An arbitrary Pulsar sink parameter. |
pulsar.producer.* | No | - | - | An arbitrary Pulsar producer API parameter. |
pulsar.source..* | No | - | - | An arbitrary Pulsar source parameter. |
pulsar.consumer.* | No | - | - | An arbitrary Pulsar consumer API parameter. |
Metadata Key | Data Type | R/W | Description |
topic | STRING NOT NULL | R | The topic name of a Pulsar message. |
message_size | INT NOT NULL | R | The Pulsar message size. |
producer_name | STRING NOT NULL | R | The producer name of a Pulsar message. |
message_id | BYTES NOT NULL | R | The ID of a Pulsar message. |
sequenceId | BIGINT NOT NULL | R | The sequence ID of a Pulsar message. |
publish_time | TIMESTAMP_LTZ(3) NOT NULL | R | The publishing time of a Pulsar message. |
event_time | TIMESTAMP_LTZ(3) NOT NULL | R/W | The properties of a Pulsar message. |
properties | MAP<STRING, STRING> NOT NULL | R/W | The event time of a Pulsar message. |
VIRTUAL and INSERT INTO operations.Pulsar Schema | Flink Format |
AVRO | avro |
JSON | json |
PROTOBUF | Not supported yet |
PROTOBUF_NATIVE | Not supported yet |
AUTO_CONSUME | Not supported yet |
AUTO_PUBLISH | Not supported yet |
NONE/BYTES | raw |
BOOLEAN | raw |
STRING | raw |
DOUBLE | raw |
FLOAT | raw |
INT8 | raw |
INT16 | raw |
INT32 | raw |
INT64 | raw |
LOCAL_DATE | Not supported yet |
LOCAL_TIME | Not supported yet |
LOCAL_DATE_TIME | Not supported yet |
schemaInfo field in the schema of a topic to store the metadata of an explicit table. For each explicit table, PulsarCatalog creates a place-holding topic. You can specify the tenant of the topic using the catalog-tenant option. The default tenant is __flink_catalog. Your Flink database maps to a namespace with the same name under this tenant. Then a topic named table_<FLINK_TABLE_NAME> is created, whose schema stores the metadata of the Flink table.testdb and a Flink table users, PulsarCatalog will create a topic table_users in the namespace testdb under the tenant __flink_catalog.table_users is a place-holding topic because it doesn't have any producers or consumers. You can use the schema of this topic to store the metadata of the Flink table.pulsar-admin schemas get persistent://<tenant>/<namespace>/<topic>
Pulsar Schema | Flink Data Type | Flink Format | Work |
AVRO | It is decided by the Avro format. | avro | Yes |
JSON | It is decided by the JSON format. | json | Yes |
PROTOBUF | Not supported yet | / | No |
PROTOBUF_NATIVE | It is decided by the Protobuf definition. | Not supported yet | No |
AUTO_CONSUME | Not supported yet | / | No |
AUTO_PUBLISH | Not supported yet | / | No |
NONE/BYTES | DataTypes.BYTES() | raw | Yes |
BOOLEAN | DataTypes.BOOLEAN() | raw | Yes |
LOCAL_DATE | DataTypes.DATE() | / | No |
LOCAL_TIME | DataTypes.TIME() | / | No |
LOCAL_DATE_TIME | DataTypes.TIMESTAMP(3) | / | No |
STRING | DataTypes.STRING() | raw | Yes |
DOUBLE | DataTypes.DOUBLE() | raw | Yes |
FLOAT | DataTypes.FLOAT() | raw | Yes |
INT8 | DataTypes.TINYINT() | raw | Yes |
INT16 | DataTypes.SMALLINT() | raw | Yes |
INT32 | DataTypes.INT() | raw | Yes |
INT64 | DataTypes.BIGINT() | raw | Yes |
LOCAL_DATE and LOCAL_TIME have corresponding Flink data types, Flink cannot parse data based on the two schema types, and automatic schema mapping will fail.tenant/namespace to the Flink database and the topic name to the Flink table name.GenericInMemoryCatalog. You can bind an explicit table to a Pulsar topic. Each Pulsar topic can be bound with multiple Flink tables.Key | Default | Type | Description | Required |
catalog-admin-url | "http://localhost:8080" | String | The Pulsar admin URL, such as http://my-broker.example.com:8080 or https://my-broker.example.com:8443. | Yes |
catalog-auth-params | - | String | The authentication parameters for accessing the Pulsar cluster. | - |
catalog-auth-plugin | - | String | The name of the authentication plugin for accessing the Pulsar cluster. | - |
catalog-service-url | "pulsar://localhost:6650" | String | The URL of the Pulsar server. A Pulsar protocol URL is needed for a Pulsar client to connect to a Pulsar cluster. Example: pulsar://localhost:6650URL for multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652Production clusters are usually accessed via domains, such as pulsar://pulsar.us-west.example.com:6650.URL with TLS authentication enabled: pulsar+ssl://pulsar.us-west.example.com:6651 | Yes |
catalog-tenant | "__flink_catalog" | String | The Pulsar tenant that stores table information. | - |
default-database | "default_database" | String | The default database of PulsarCatalog. If a database with this name does not exist, one will be created automatically. | - |
CREATE CATALOG pulsar WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = '<ADMIN_URL>','catalog-service-url' = '<SERVICE_URL>');
CREATE TABLE `pulsar_source` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_source','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');CREATE TABLE `pulsar_sink` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_sink','format' = 'json','pulsar.sink.deliveryGuarantee' = 'exactly-once','pulsar.sink.transactionTimeoutMillis' = '120000');INSERT INTO `pulsar_sink` SELECT * FROM `pulsar_source`;
CREATE CATALOG `pulsar` WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = 'http://pulsar:8080','catalog-service-url' = 'pulsar://pulsar:6650');INSERT INTO `pulsar`.`default_database`.`pulsar_sink` SELECT * FROM `pulsar`.`default_database`.`pulsar_source`;
pulsar_source and pulsar_sink tables in the above example are created using the following statements (which can be put in the same SQL job).CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_source` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_source','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_sink` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_sink','format' = 'json','pulsar.sink.deliveryGuarantee' = 'exactly-once','pulsar.sink.transactionTimeoutMillis' = '120000');
{"schema": "{\\"type\\":\\"record\\",\\"name\\":\\"userBehavior\\",\\"namespace\\":\\"my.example\\",\\"fields\\":[{\\"name\\":\\"user_id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"item_id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"behavior\\",\\"type\\":\\"string\\"}]}","type": "JSON","properties": {}}
# Configure the schemabin/pulsar-admin schemas upload -f ./schema.json topic_sourcebin/pulsar-admin schemas upload -f ./schema.json topic_sink# Check the schemabin/pulsar-admin schemas get topic_sourcebin/pulsar-admin schemas get topic_sink
public/default is tenant/namespace, which is the default Pulsar cluster.CREATE CATALOG `pulsar` WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = 'http://pulsar:8080','catalog-service-url' = 'pulsar://pulsar:6650');INSERT INTO `pulsar`.`public/default`.`topic_sink` SELECT * FROM `pulsar`.`public/default`.`topic_source`;
java.lang.NullPointerException: You haven't enable transaction in Pulsar client.
pulsar-admin transactions slow-transactions -t 1s to view transactions in OPEN state. After OPEN transactions are committed or reverted, you will be able to read data written to the topic after the restart.
Suggestion: Configure an appropriate transaction timeout period using WITH parameters (the default Pulsar transaction timeout is 3 hours). For example, you can use 'pulsar.sink.transactionTimeoutMillis' = '120000' to set the timeout period to 2 minutes. Note that the transaction timeout period must be larger than the checkpoint interval.java.lang.IllegalArgumentException: We only support normal message id currently occurs, it's because batch write is enabled for Pulsar write operations. Currently, Pulsar sources do not support restoring batch-write messages. With Stream Compute Service, batch write is disabled by default for Pulsar sinks.Caused by: java.lang.IllegalArgumentException: We only support normal message id currently.
PulsarOrderedPartitionSplitReader#beforeCreatingConsumer).NonDurable subscription mode for the Pulsar source?PulsarSourceEnumerator#createSubscription creates a Durable subscription first.PulsarPartitionSplitReaderBase#createPulsarConsumer then consumes data in the NonDurable mode, the error Durable subscription with the same name already exists will occur.Option | Required | Default Value | Data Type | Description |
pulsar.consumer.subscriptionMode | No | Durable | Enum | The Pulsar subscription mode. Valid values include Durable and NonDurable. In the Durable mode, the cursor is durable, which retains messages and persists the current position. If a broker restarts from a failure, it can recover the cursor from the persistent storage (bookie), so that messages can continue to be consumed from the last consumed position. In the NonDurable mode, once a broker stops, the cursor is lost and can never be recovered, so messages cannot continue to be consumed from the last consumed position. To learn more, see Subscription modes. |
174:1:0 > 174:1:-1.publish-time?publish-time will return HTTP 307 Temporary Redirect, and the Pulsar client API used in the Flink connector will return HTTP 500 Server Error. The job will fail to start. You can use the RESTful API get-message-by-id to view the error.## 1662480195714 is a publishing time accurate to the millisecond.curl http://${adminUrl}:8080/admin/v2/persistent/public/default/${topic}/messageid/1662480195714
publish-time is not used, you can specify source.stop.at-publish-time.pulsar.admin.requestRetries (the number of retries for RESTful APIs, which is 5 by default) to avoid this issue.'pulsar.source.partitionDiscoveryIntervalMs' ='0'.Feedback