英迈思做的网站怎么样南宁致峰网站建设
2026/1/10 7:59:01 网站建设 项目流程
英迈思做的网站怎么样,南宁致峰网站建设,泰安支点网络科技有限公司,做盗版电影网站赚钱目录 1. 项目背景与环境 1.1 节点角色分配 2. 核心规划 (关键避坑) 2.1 端口规划表 2.2 目录规划 3. 部署前置准备 (所有节点 nd4, nd5, nd6) 3.1 系统配置 (需 sudo 权限) 3.2 检查 CPU AVX2 支持 3.3 安装 JDK 17 (必须) 3.4 准备程序包与目录 4. Frontend (FE) 部…目录1. 项目背景与环境1.1 节点角色分配2. 核心规划 (关键避坑)2.1 端口规划表2.2 目录规划3. 部署前置准备 (所有节点 nd4, nd5, nd6)3.1 系统配置 (需 sudo 权限)3.2 检查 CPU AVX2 支持3.3 安装 JDK 17 (必须)3.4 准备程序包与目录4. Frontend (FE) 部署4.1 修改 FE 配置4.2 启动 Leader 节点 (nd4)4.3 启动 Follower 节点 (nd5, nd6)4.4 注册节点 (构建 HA)5. Backend (BE) 部署5.1 修改 BE 配置5.2 启动 BE​编辑5.3 注册 BE 节点5.4 访问 WebUI6. Hadoop Paimon 集成6.1 分发 Hadoop 配置文件6.2 创建 Paimon Catalog6.3 验证查询6.3.1 切换 Catalog 与 数据库6.3.2 查看表列表6.3.3 数据查询验证6.4 三种方案的区别与特点6.4.1 方案一Hive Catalog (HA Mode / 高可用模式)6.4.2 方案二Filesystem Catalog (文件系统模式)6.4.3 方案三Hive Catalog (Single Node / 单点模式)6.4.4 综合对比表6.4.5 最终推荐7. 故障排查总结 (Troubleshooting)8.基础测试增删改查8.1 插入数据8.2 更新数据8.3 删除数据9. Paimon外表数据写入StarRocks内表基础测试9.1 Paimon数据准备9.2 测试代码9.3 验证数据1. 项目背景与技术演进在早期的基准测试中虽然StarRocks 2.5.22展现了优秀的向量化执行性能但受限于当时的架构设计其在Paimon数据湖的集成上存在明显短板不支持原生 Paimon Catalog导致元数据管理复杂且无法完全发挥湖仓分析的性能潜力。随着本项目Paimon OLAP架构的演进以及友商Doris 4.0的重大升级为了确保技术选型的公平性与前瞻性项目组决定部署StarRocks 4.0.2。该版本是 StarRocks 迈向“极速统一湖仓”的里程碑式版本它不仅原生支持了 Paimon Catalog彻底解决了 2.x 时代的集成痛点还通过JDK 17的引入及AVX2指令集的深度优化试图在复杂查询与高并发场景下重塑性能标杆。2. 测试目标本次部署不仅仅是版本的迭代更是一场关于“极致性能与现代湖仓架构”的深度验证。核心目标如下原生湖仓集成验证Native Integration重点验证 StarRocks 4.0.2原生 Paimon Catalog的连通性与稳定性评估其相比 2.5 版本“Hive 兼容模式”在元数据同步、Schema 自动发现及分区裁剪方面的体验提升。代际性能基准测试Generational Benchmarking在同等硬件资源下对比 StarRocks 4.0.2 与 StarRocks 2.5 及 Doris 4.0 在海量数据扫描、多表关联分析Join场景下的性能差异量化Pipeline 并行执行引擎与JDK 17带来的吞吐收益。复杂场景适应性验证新版本在处理 Primary Key 模型更新、存算分离可选及资源隔离方面的表现确认其是否能在保持极速查询优势的同时弥补大字段处理等历史短板。3.补充说明StarRocks 2.x不支持对Paimon的操作高版本的StarRocks对于数据也不支持写回Paimon。官网描述网址Paimon catalog | StarRocks调研架构图如下1. 项目背景与环境本次部署旨在现有的 CDH 6.3.2 集群上混合部署 StarRocks 4.0.2 版本用于 Paimon 数据湖分析。操作系统: CentOS 7 (CDH 6.3.2 环境)部署用户:bigdataStarRocks 版本: 4.0.2JDK 要求:JDK 17(StarRocks 3.x/4.x 强制要求 JDK 17)原有 CDH 节点:nd1-nd6,nd11-nd16本次部署目标节点:nd4,nd5,nd61.1 节点角色分配前置组件分配IP主机名角色版本10.x.xx.201-10.x.xx.205 10.x.xx.215 10.x.xx.149 10.x.xx.151 10.x.xx.156 10.x.xx.157 10.x.xx.167 10.x.xx.206nd1-nd5 nd6 nd11 nd12 nd13 nd14 nd15 nd16CDH6.3.210.x.xx.201-10.x.xx.205nd1-nd5Paimon1.1.1采用FE (Frontend) BE (Backend) 混合部署模式共 3 个节点。IP主机名角色部署组件说明10.x.xx.204nd4FE (Leader) BEStarRocks 4.0.2初始 Leader 节点10.x.xx.205nd5FE (Follower) BEStarRocks 4.0.2高可用节点10.x.xx.215nd6FE (Follower) BEStarRocks 4.0.2高可用节点2. 核心规划 (关键避坑)由于 CDH 的 YARN 占用 8040Zookeeper 占用 9010必须对 StarRocks 默认端口进行修改。2.1 端口规划表组件配置文件参数默认端口规划端口修改原因FEhttp_port80308030无冲突保持默认FErpc_port90209020无冲突保持默认FEquery_port90309030MySQL 连接端口FEedit_log_port901019010避开 CDH Zookeeper JMXBEbe_port90609060Thrift ServerBEwebserver_port804018040避开 CDH YARN NodeManagerBEheartbeat_service_port90509050心跳服务BEbrpc_port80608060数据传输2.2 目录规划安装根目录:/home/bigdata/starrocks软件目录:/home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64JDK 17 目录:/home/bigdata/starrocks/jdk-17.0.2FE 元数据:/home/bigdata/starrocks/data/metaBE 数据存储:/home/bigdata/starrocks/data/storage3. 部署前置准备 (所有节点 nd4, nd5, nd6)3.1 系统配置 (需 sudo 权限)# 1. 关闭 Swap sudo swapoff -a # (可选) 永久关闭: sudo sed -i /swap/s/^/#/ /etc/fstab ​ # 2. 修改文件句柄限制 sudo vi /etc/security/limits.conf # 添加或修改以下内容: * soft nofile 65535 * hard nofile 65535 * soft nproc 65535 * hard nproc 655353.2 检查 CPU AVX2 支持StarRocks 4.x 默认开启 AVX2 指令集优化。cat /proc/cpuinfo | grep avx2有输出: 正常。无输出: 需在be.conf中添加enable_avx2 false。3.3 安装 JDK 17 (必须)StarRocks 4.0.2 不支持 JDK 8 或 11必须使用 JDK 17。# 创建目录 mkdir -p /home/bigdata/starrocks cd /home/bigdata/starrocks ​ # 下载 JDK 17 (如果您已通过 wget 下载) # wget https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz ​ # 解压 tar -zxvf openjdk-17.0.2_linux-x64_bin.tar.gz -C /home/bigdata/starrocks/ # 验证路径 ls -ld /home/bigdata/starrocks/jdk-17.0.23.4 准备程序包与目录# 1. 解压 StarRocks 包 tar -zxvf StarRocks-4.0.2-centos-amd64.tar.gz -C /home/bigdata/starrocks/ ​ # 2. 创建数据目录 mkdir -p /home/bigdata/starrocks/data/meta mkdir -p /home/bigdata/starrocks/data/storage4. Frontend (FE) 部署操作节点: nd4, nd5, nd64.1 修改 FE 配置编辑/home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/fe.confvi /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/fe.conf修改或者添加如下内容# ----------------------------------- # 1. 核心目录与 JDK (必须修改) # ----------------------------------- meta_dir /home/bigdata/starrocks/data/meta ​ # 指向独立的 JDK 17 路径 JAVA_HOME /home/bigdata/starrocks/jdk-17.0.2 ​ # ----------------------------------- # 2. 端口修改 (解决 ZK 冲突) # ----------------------------------- http_port 8030 rpc_port 9020 query_port 9030 # 修改 edit_log_port 为 19010避开 CDH Zookeeper edit_log_port 19010 ​ # ----------------------------------- # 3. 网络绑定 # ----------------------------------- # 根据您的 IP 段 (10.8.16.x) 配置 priority_networks 10.8.16.0/244.2 启动 Leader 节点 (nd4)仅在 nd4 (10.x.xx.204) 上执行cd /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/bin ./start_fe.sh --daemon检查:tail -f ../log/fe.log看到thrift server started表示成功。4.3 启动 Follower 节点 (nd5, nd6)注意: 因为edit_log_port改为了19010--helper参数必须对应。在 nd5 (10.x.xx.205) 执行:cd /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/bin ./start_fe.sh --helper 10.x.xx.204:19010 --daemon在 nd6 (10.x.xx.215) 执行:cd /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/bin ./start_fe.sh --helper 10.x.xx.204:19010 --daemon4.4 注册节点 (构建 HA)回到nd4连接 MySQL 并添加节点mysql -h 10.x.xx.204 -P 9030 -u root-- 添加 nd5 (注意端口是 19010) ALTER SYSTEM ADD FOLLOWER 10.x.xx.205:19010; -- 添加 nd6 (注意端口是 19010) ALTER SYSTEM ADD FOLLOWER 10.x.xx.215:19010; -- 验证 SHOW PROC /frontends;应显示 3 个节点状态均为Alive: true。5. Backend (BE) 部署操作节点: nd4, nd5, nd65.1 修改 BE 配置编辑/home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/be.confvi /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/be.conf修改或者增加下述内容# ----------------------------------- # 1. JDK 配置 (必须 JDK 17) # ----------------------------------- JAVA_HOME /home/bigdata/starrocks/jdk-17.0.2 # ----------------------------------- # 2. 端口修改 (解决 YARN 冲突) # ----------------------------------- be_port 9060 # 修改 web_server_port 为 18040避开 YARN NodeManager web_server_port 18040 heartbeat_service_port 9050 brpc_port 8060 # ----------------------------------- # 3. 存储与网络 # ----------------------------------- storage_root_path /home/bigdata/starrocks/data/storage priority_networks 10.8.16.0/245.2 启动 BE在三个节点上分别执行cd /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/bin ./start_be.sh --daemon检查:netstat -tpln | grep 18040确认端口监听成功。5.3 注册 BE 节点回到nd4的 MySQL 客户端执行-- 注意端口是 heartbeat_service_port (默认 9050) ALTER SYSTEM ADD BACKEND 10.x.xx.204:9050; ALTER SYSTEM ADD BACKEND 10.x.xx.205:9050; ALTER SYSTEM ADD BACKEND 10.x.xx.215:9050; -- 验证 SHOW PROC /backends;应显示 3 个节点Alive: true。5.4 访问 WebUI打开浏览器访问 FE 的 WebUI地址:http://10.8.16.204:8030用户:root密码: (空)FE节点状态BE节点状态6. Hadoop Paimon 集成StarRocks 4.0 已经原生支持 Paimon Catalog无需像 2.5 版本那样纠结。6.1 分发 Hadoop 配置文件为了让 StarRocks 能够访问 CDH HDFS需要复制配置文件。# 在 nd4, nd5, nd6 上分别执行 # FE scp root10.x.xx.201:/etc/hadoop/conf/core-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/ scp root10.x.xx.201:/etc/hadoop/conf/hdfs-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/ scp root10.x.xx.201:/etc/hive/conf/hive-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/ # BE scp root10.x.xx.201:/etc/hadoop/conf/core-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/ scp root10.x.xx.201:/etc/hadoop/conf/hdfs-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/ scp root10.x.xx.201:/etc/hive/conf/hive-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/注意: 复制完成后建议重启所有 FE 和 BE。# 启动 FE: sh fe/bin/start_fe.sh --daemon # 停止 FE: sh fe/bin/stop_fe.sh # 启动 BE: sh be/bin/start_be.sh --daemon #停止 BE: sh be/bin/stop_be.sh6.2 创建 Paimon Catalog在 MySQL 客户端执行DROP CATALOG IF EXISTS paimon_catalog; CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES ( -- 1. StarRocks 自身的类型 type paimon, -- 2. 【核心修改】Paimon SDK 识别的 HMS 类型是 hive不是 hms paimon.catalog.type hive, -- 3. HMS 地址 hive.metastore.uris thrift://nd1:9083,thrift://nd3:9083, -- 4. 指定数仓路径 paimon.catalog.warehouse hdfs://nd1:8020/user/hive/warehouse, -- 5. 认证方式 hadoop.security.authentication simple );6.3 验证查询6.3.1 切换 Catalog 与 数据库-- 1. 切换到该 Catalog SET CATALOG paimon_catalog; -- 2. 查看数据库 SHOW DATABASES;6.3.2 查看表列表-- 3. 进入某个库 (这里为ods) USE ods; -- 4. 查看表 SHOW TABLES;6.3.3 数据查询验证-- 5. 查询数据 SELECT * FROM t_admin_division_code LIMIT 5;6.4 三种方案的区别与特点6.4.1 方案一Hive Catalog (HA Mode / 高可用模式)DROP CATALOG IF EXISTS paimon_catalog; CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES ( -- 1. StarRocks 自身的类型 type paimon, -- 2. 【核心修改】Paimon SDK 识别的 HMS 类型是 hive不是 hms paimon.catalog.type hive, -- 3. HMS 地址 hive.metastore.uris thrift://nd1:9083,thrift://nd3:9083, -- 4. 指定数仓路径 paimon.catalog.warehouse hdfs://nd1:8020/user/hive/warehouse, -- 5. 认证方式 hadoop.security.authentication simple ); USE ods; SELECT * FROM t_admin_division_code LIMIT 5;配置特征:paimon.catalog.type hive, 且hive.metastore.uris配置了多个地址(thrift://nd1:9083,thrift://nd3:9083)。原理: 同方案三但客户端StarRocks支持故障转移。特点:高可用 (HA): 如果nd1挂了StarRocks 会自动尝试连接nd3。这与 CDH 集群本身的高可用架构是匹配的。生产标准: 这是企业级生产环境最稳健的配置。6.4.2 方案二Filesystem Catalog (文件系统模式)DROP CATALOG IF EXISTS paimon_catalog; CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES ( type paimon, paimon.catalog.type filesystem, paimon.catalog.warehouse hdfs://nd1:8020/user/hive/warehouse, hadoop.security.authentication simple ); USE ods; SELECT * FROM t_admin_division_code LIMIT 5;配置特征:paimon.catalog.type filesystem原理: StarRocks 不经过 Hive Metastore而是直接去扫描 HDFS 目录结构来获取数据库和表的元数据Schema、分区信息等。特点:去中心化: 完全不依赖 Hive 服务。即使 CDH 的 Hive Metastore 挂了只要 HDFS 活着StarRocks 就能查 Paimon。局限性: 它是“孤独”的。如果您的 Flink 任务是用 Hive Catalog 写入的而 StarRocks 用 Filesystem Catalog 读取一旦路径配置稍有偏差两边看到的数据可能不一致。且无法利用 Hive 的权限控制Ranger/Sentry。性能: 对于超多表或海量文件的场景频繁 listing HDFS 目录可能比直接查 Hive Metastore 慢。6.4.3 方案三Hive Catalog (Single Node / 单点模式)DROP CATALOG IF EXISTS paimon_catalog; CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES ( -- 1. StarRocks 自身的类型 type paimon, -- 2. 【核心修改】Paimon SDK 识别的 HMS 类型是 hive不是 hms paimon.catalog.type hive, -- 3. HMS 地址 hive.metastore.uris thrift://nd1:9083, -- 4. 指定数仓路径 paimon.catalog.warehouse hdfs://nd1:8020/user/hive/warehouse, -- 5. 认证方式 hadoop.security.authentication simple ); USE ods; SELECT * FROM t_admin_division_code LIMIT 5;配置特征:paimon.catalog.type hive, 且hive.metastore.uris只有一个地址(thrift://nd1:9083)。原理: StarRocks 通过 Thrift 协议连接 Hive Metastore 获取元数据然后去 HDFS 读取数据。特点:统一元数据: 实现了 Flink写入端和 StarRocks读取端的元数据统一是生产环境的标准做法。单点故障风险:这是最大的缺点。如果nd1节点的 Hive Metastore 服务宕机或进行维护StarRocks 将无法查询任何 Paimon 表整个分析链路中断。6.4.4 综合对比表维度方案一 (Hive HA)方案二 (Filesystem)方案三 (Hive 单点)元数据存储Hive MetastoreHDFS 文件系统Hive Metastore依赖组件HDFS Hive MS仅 HDFSHDFS Hive MS与其他引擎互通极好差 (需手动对齐路径)好 (共享 HMS)抗风险能力高 (自动故障转移)中 (依赖 HDFS)低 (存在单点故障)适用场景生产环境测试、Hive 服务不可用时临时测试6.4.5 最终推荐强烈推荐使用【方案一】推荐理由架构统一: Paimon 的最佳实践是使用 Hive Catalog这样 Flink 实时写入、Spark 离线修正、StarRocks 实时分析都能看到同一份元数据避免了“分裂”。高可用性: 您的 CDH 环境既然部署了两个 Metastore (nd1, nd3)就应该充分利用。配置多个 URI 可以确保在一个节点宕机时StarRocks 的查询业务不受影响。未来扩展: 如果未来需要做权限管控如 Ranger基于 Hive Catalog 的链路更容易集成。7. 故障排查总结 (Troubleshooting)在部署过程中我们解决了以下核心问题后续运维请注意JDK 版本报错:现象:Error: JDK 11 is not supported...解决: StarRocks 4.0.2 强制要求JDK 17。必须在fe.conf和be.conf中显式配置JAVA_HOME .../jdk-17.0.2。FE 启动失败 / 端口冲突:现象:edit_log_port 9010 is already in use。原因: CDH 的 Zookeeper (zookeep) 占用了 9010。解决: 将fe.conf中的edit_log_port修改为19010。注意所有 FE 节点必须一致且 helper 参数也要用新端口。YARN 端口冲突:原因: StarRocks BE 默认webserver_port为 8040与 YARN NodeManager 冲突。解决: 在be.conf中将webserver_port修改为18040。残留进程:现象: 修改配置后重启端口仍被占用。解决: 使用ps -ef | grep starrocks查出 PID使用kill -9彻底杀掉旧进程后再启动。权限问题:现象:Permission denied。解决: 确保/home/bigdata/starrocks及其子目录的归属权是bigdata:bigdata。如果曾用 root 启动过需执行sudo chown -R bigdata:bigdata /home/bigdata/starrocks修复。8.基础测试增删改查将下述测试代码保存为starrocks4_test.py【python解释器3.8.20、windows系统11】# -*- coding: utf-8 -*- import pymysql import random import time import logging import functools from sshtunnel import SSHTunnelForwarder # 配置信息 (适配 StarRocks 4.0.2) # 1. SSH 连接信息 # 修改为新集群的 FE Leader 节点 (nd4) SSH_HOST 10.x.xx.204 SSH_PORT 22 SSH_USER xxxxxx SSH_PASSWORD xxxxxxxxxxxxxxx # 2. StarRocks 数据库信息 SR_LOCAL_HOST 127.0.0.1 # 【关键修改】StarRocks 4.0.2 部署文档中 query_port 保持了默认的 9030 SR_QUERY_PORT 9030 SR_DB_USER root SR_DB_PWD DB_NAME python_perf_test_sr4 # 库名区分 4.0 TABLE_NAME student_scores_perf_sr4 LOG_FILE starrocks4_test_report.log # 日志与工具模块 (通用) def setup_logger(): logger logging.getLogger(StarRocks4Tester) logger.setLevel(logging.INFO) if logger.hasHandlers(): logger.handlers.clear() file_handler logging.FileHandler(LOG_FILE, modew, encodingutf-8) console_handler logging.StreamHandler() formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger setup_logger() def measure_time(func): functools.wraps(func) def wrapper(*args, **kwargs): start_time time.time() logger.info(f正在执行: [{func.__name__}] ...) try: result func(*args, **kwargs) duration time.time() - start_time logger.info(f执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒) return result except Exception as e: duration time.time() - start_time logger.error(f执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}) raise e return wrapper # 业务逻辑 measure_time def init_db_and_table(cursor): cursor.execute(fCREATE DATABASE IF NOT EXISTS {DB_NAME}) cursor.execute(fUSE {DB_NAME}) # 【StarRocks 4.0 建表】 # 依然使用 Primary Key 模型 (性能最优) create_sql f CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( id INT NOT NULL COMMENT 用户ID, name VARCHAR(50) COMMENT 姓名, age INT COMMENT 年龄, score INT COMMENT 分数, update_time DATETIME COMMENT 更新时间 ) PRIMARY KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES ( -- 【修改】新集群有3个BE设置副本数为3以测试高可用 replication_num 3, -- 4.0 默认开启持久化索引显式写出来更清晰 enable_persistent_index true ); cursor.execute(create_sql) # 清空表以便重复测试 cursor.execute(fTRUNCATE TABLE {TABLE_NAME}) logger.info(f数据库 {DB_NAME} 和表 {TABLE_NAME} 已初始化 (Replication3)) measure_time def insert_data_batch(cursor, count10): data [] for i in range(1, count 1): name fUser_{i:03d} age random.randint(18, 30) score random.randint(50, 100) data.append((i, name, age, score)) sql fINSERT INTO {TABLE_NAME} (id, name, age, score, update_time) VALUES (%s, %s, %s, %s, NOW()) cursor.executemany(sql, data) logger.info(f成功插入 {count} 条数据) measure_time def query_and_log(cursor, stage_name): # 简单全表查验证数据一致性 sql fSELECT * FROM {TABLE_NAME} ORDER BY id cursor.execute(sql) results cursor.fetchall() logger.info(f--- [{stage_name}] 当前总行数: {len(results)} ---) if results: # 只打印前3条 for row in results[:3]: logger.info(fRow: {row}) measure_time def update_random_data(cursor, update_count3): cursor.execute(fSELECT id FROM {TABLE_NAME}) all_ids [row[id] for row in cursor.fetchall()] if not all_ids: return target_ids random.sample(all_ids, min(len(all_ids), update_count)) for uid in target_ids: new_score random.randint(95, 100) # Primary Key 模型 update 性能极快 sql fUPDATE {TABLE_NAME} SET score %s, update_time NOW() WHERE id %s cursor.execute(sql, (new_score, uid)) logger.info(f - 更新 ID{uid}, New Score{new_score}) measure_time def delete_random_data(cursor, delete_count2): cursor.execute(fSELECT id FROM {TABLE_NAME}) all_ids [row[id] for row in cursor.fetchall()] if not all_ids: return target_ids random.sample(all_ids, min(len(all_ids), delete_count)) for uid in target_ids: sql fDELETE FROM {TABLE_NAME} WHERE id %s cursor.execute(sql, (uid,)) logger.info(f - 删除 ID{uid}) # 主流程 def main_process(): server None conn None try: logger.info( 1. 正在建立 SSH 隧道 (连接 nd4: 10.8.16.204) ...) # 建立 SSH 隧道 server SSHTunnelForwarder( (SSH_HOST, SSH_PORT), ssh_usernameSSH_USER, ssh_passwordSSH_PASSWORD, # 映射远程 9030 到本地随机端口 remote_bind_address(SR_LOCAL_HOST, SR_QUERY_PORT) ) server.start() logger.info(f SSH 隧道建立成功! 映射 StarRocks 4.0 端口 {SR_QUERY_PORT} - 本地 {server.local_bind_port}) # 2. 连接数据库 logger.info( 正在连接 StarRocks 4.0.2 ...) conn pymysql.connect( host127.0.0.1, # 连接本机 portserver.local_bind_port, # 使用隧道端口 userSR_DB_USER, passwordSR_DB_PWD, charsetutf8mb4, cursorclasspymysql.cursors.DictCursor, autocommitTrue ) cursor conn.cursor() # 3. 执行测试逻辑 # 验证建表 (验证 FE 元数据管理) init_db_and_table(cursor) # 验证插入 (验证 BE 数据写入与副本同步) insert_data_batch(cursor, count100) time.sleep(1) # StarRocks 4.0 也是准实时的稍作等待 query_and_log(cursor, 插入后) # 验证更新 (验证 Primary Key 索引机制) update_random_data(cursor, update_count5) time.sleep(1) query_and_log(cursor, 更新后) # 验证删除 delete_random_data(cursor, delete_count5) time.sleep(1) query_and_log(cursor, 删除后) logger.info( StarRocks 4.0.2 基本功能测试全部通过 ) except Exception as e: logger.error(f主流程发生错误: {e}) finally: # 清理资源 if conn: conn.close() logger.info(数据库连接已关闭) if server: server.stop() logger.info(SSH 隧道已关闭) if __name__ __main__: main_process()对应的log文件内容如下2025-12-10 18:13:50,107 - INFO - 1. 正在建立 SSH 隧道 (连接 nd4: 10.x.xx.204) ... 2025-12-10 18:13:50,405 - INFO - SSH 隧道建立成功! 映射 StarRocks 4.0 端口 9030 - 本地 58569 2025-12-10 18:13:50,405 - INFO - 正在连接 StarRocks 4.0.2 ... 2025-12-10 18:13:50,474 - INFO - 正在执行: [init_db_and_table] ... 2025-12-10 18:13:50,554 - INFO - 数据库 python_perf_test_sr4 和表 student_scores_perf_sr4 已初始化 (Replication3) 2025-12-10 18:13:50,554 - INFO - 执行完成: [init_db_and_table] | 耗时: 0.0798 秒 2025-12-10 18:13:50,554 - INFO - 正在执行: [insert_data_batch] ... 2025-12-10 18:14:01,150 - INFO - 成功插入 100 条数据 2025-12-10 18:14:01,150 - INFO - 执行完成: [insert_data_batch] | 耗时: 10.5959 秒 2025-12-10 18:14:02,151 - INFO - 正在执行: [query_and_log] ... 2025-12-10 18:14:02,192 - INFO - --- [插入后] 当前总行数: 100 --- 2025-12-10 18:14:02,192 - INFO - Row: {id: 1, name: User_001, age: 26, score: 81, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:02,192 - INFO - Row: {id: 2, name: User_002, age: 29, score: 55, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:02,192 - INFO - Row: {id: 3, name: User_003, age: 21, score: 63, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:02,192 - INFO - 执行完成: [query_and_log] | 耗时: 0.0411 秒 2025-12-10 18:14:02,193 - INFO - 正在执行: [update_random_data] ... 2025-12-10 18:14:02,330 - INFO - - 更新 ID4, New Score98 2025-12-10 18:14:02,425 - INFO - - 更新 ID57, New Score100 2025-12-10 18:14:02,529 - INFO - - 更新 ID29, New Score95 2025-12-10 18:14:02,623 - INFO - - 更新 ID8, New Score95 2025-12-10 18:14:02,727 - INFO - - 更新 ID42, New Score100 2025-12-10 18:14:02,727 - INFO - 执行完成: [update_random_data] | 耗时: 0.5346 秒 2025-12-10 18:14:03,728 - INFO - 正在执行: [query_and_log] ... 2025-12-10 18:14:03,752 - INFO - --- [更新后] 当前总行数: 100 --- 2025-12-10 18:14:03,753 - INFO - Row: {id: 1, name: User_001, age: 26, score: 81, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:03,753 - INFO - Row: {id: 2, name: User_002, age: 29, score: 55, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:03,753 - INFO - Row: {id: 3, name: User_003, age: 21, score: 63, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:03,753 - INFO - 执行完成: [query_and_log] | 耗时: 0.0251 秒 2025-12-10 18:14:03,753 - INFO - 正在执行: [delete_random_data] ... 2025-12-10 18:14:03,928 - INFO - - 删除 ID41 2025-12-10 18:14:04,020 - INFO - - 删除 ID54 2025-12-10 18:14:04,116 - INFO - - 删除 ID92 2025-12-10 18:14:04,220 - INFO - - 删除 ID59 2025-12-10 18:14:04,324 - INFO - - 删除 ID20 2025-12-10 18:14:04,324 - INFO - 执行完成: [delete_random_data] | 耗时: 0.5706 秒 2025-12-10 18:14:05,324 - INFO - 正在执行: [query_and_log] ... 2025-12-10 18:14:05,343 - INFO - --- [删除后] 当前总行数: 95 --- 2025-12-10 18:14:05,358 - INFO - Row: {id: 1, name: User_001, age: 26, score: 81, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:05,358 - INFO - Row: {id: 2, name: User_002, age: 29, score: 55, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:05,358 - INFO - Row: {id: 3, name: User_003, age: 21, score: 63, update_time: datetime.datetime(2025, 12, 10, 18, 13, 51)} 2025-12-10 18:14:05,359 - INFO - 执行完成: [query_and_log] | 耗时: 0.0345 秒 2025-12-10 18:14:05,359 - INFO - StarRocks 4.0.2 基本功能测试全部通过 2025-12-10 18:14:05,359 - INFO - 数据库连接已关闭 2025-12-10 18:14:05,460 - INFO - SSH 隧道已关闭去nd4执行下述语句进行查看mysql -h 10.x.xx.204 -P 9030 -u rootshow databases; use python_perf_test_sr4; show tables;select * from student_scores_perf_sr4;8.1 插入数据这里展示部分数据可以看出插入了100条测试数据8.2 更新数据控制台打印结果如下终端验证部分数据展示如下8.3 删除数据控制台打印结果如下终端验证部分数据展示如下9. Paimon外表数据写入StarRocks内表基础测试9.1 Paimon数据准备对于Paimon外表数据写入StarRocks内表基础测试需要提前在Flink SQL会话里面创建Paimon表并插入测试数据。-- 1. 创建 Flink 端的 Paimon Catalog CREATE CATALOG paimon_catalog WITH ( type paimon, warehouse hdfs:///user/hive/warehouse, metastore hive, hive-conf-dir /etc/hive/conf.cloudera.hive ); -- 2. 切换 Catalog 和 Database USE CATALOG my_paimon; CREATE DATABASE IF NOT EXISTS ods; USE ods; -- 3. 创建 Paimon 表 (源表) -- 这是一个记录用户行为的日志表 CREATE TABLE IF NOT EXISTS paimon_source_event ( user_id INT, item_id INT, behavior STRING, dt STRING, ts TIMESTAMP(3), PRIMARY KEY (dt, user_id, item_id) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( bucket 1, file.format parquet ); -- 4. 写入测试数据 (Batch 模式写入) INSERT INTO paimon_source_event VALUES (1001, 501, click, 2025-12-12, TIMESTAMP 2025-12-12 10:00:00.123), (1002, 502, view, 2025-12-12, TIMESTAMP 2025-12-12 10:05:00.456), (1003, 501, buy, 2025-12-12, TIMESTAMP 2025-12-12 10:10:00.789), (1001, 503, view, 2025-12-13, TIMESTAMP 2025-12-13 11:00:00.000), (1004, 501, click, 2025-12-13, TIMESTAMP 2025-12-13 11:05:00.000);9.2 测试代码将下述测试代码保存为doris4_paimon_tel_test.py【python解释器3.8.20、windows系统11】# -*- coding: utf-8 -*- import pymysql import time import logging import functools from sshtunnel import SSHTunnelForwarder # 配置信息 # 1. SSH 连接信息 (参考 starrocks4_test.py) SSH_HOST 10.x.xx.204 # StarRocks FE Leader IP (nd4) SSH_PORT 22 SSH_USER xxxxx SSH_PASSWORD xxxxxxxxxxxxxxxxx # 2. StarRocks 数据库连接信息 SR_LOCAL_HOST 127.0.0.1 SR_QUERY_PORT 9030 # StarRocks 默认查询端口 SR_DB_USER root SR_DB_PWD # 3. Paimon Catalog 配置 # StarRocks 创建 Paimon Catalog 语法略有不同 CATALOG_PROPS { type: paimon, paimon.catalog.type: hive, # StarRocks 这里通常填 hive 代表基于 HMS hive.metastore.uris: thrift://nd1:9083,thrift://nd3:9083, warehouse: hdfs://nd1:8020/user/hive/warehouse # StarRocks 通常不需要显式指定 hadoop.conf.dir除非有特殊 HA 配置 } # 4. 业务配置 PAIMON_CATALOG_NAME paimon_catalog_sr # 避免与 Doris 测试混淆改个名 PAIMON_DB ods PAIMON_TABLE paimon_source_event SR_TEST_DB starrocks_paimon_test_db SR_TARGET_TABLE sr_target_event_sink LOG_FILE starrocks4_paimon_etl_report.log # 日志与工具模块 def setup_logger(): logger logging.getLogger(StarRocksPaimonTester) logger.setLevel(logging.INFO) if logger.hasHandlers(): logger.handlers.clear() formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) file_handler logging.FileHandler(LOG_FILE, modew, encodingutf-8) file_handler.setFormatter(formatter) console_handler logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger setup_logger() def measure_time(func): functools.wraps(func) def wrapper(*args, **kwargs): start_time time.time() logger.info(f正在执行: [{func.__name__}] ...) try: result func(*args, **kwargs) duration time.time() - start_time logger.info(f执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒) return result except Exception as e: duration time.time() - start_time logger.error(f执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}) raise e return wrapper # 核心测试逻辑 measure_time def init_starrocks_catalog(cursor): 在 StarRocks 中初始化 Paimon Catalog logger.info(f正在初始化 StarRocks Catalog: {PAIMON_CATALOG_NAME} ...) # 1. 删除旧 Catalog # StarRocks 删除 Catalog 语法: DROP CATALOG name cursor.execute(fDROP CATALOG IF EXISTS {PAIMON_CATALOG_NAME}) # 2. 创建 Catalog # StarRocks 语法: CREATE EXTERNAL CATALOG ... props_str ,\n.join([f{k} {v} for k, v in CATALOG_PROPS.items()]) create_sql f CREATE EXTERNAL CATALOG {PAIMON_CATALOG_NAME} PROPERTIES ( {props_str} ); logger.info(f发送 Create Catalog 请求:\n{create_sql}) cursor.execute(create_sql) logger.info(Catalog 创建成功) time.sleep(2) # 等待元数据加载 measure_time def check_paimon_source(cursor): 验证 StarRocks 是否能读取 Paimon 数据 logger.info(f检查 Paimon 数据源: {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}) # 【差异点】StarRocks 切换 Catalog 使用 SET CATALOG cursor.execute(fSET CATALOG {PAIMON_CATALOG_NAME}) cursor.execute(fUSE {PAIMON_DB}) cursor.execute(SHOW TABLES) tables [list(row.values())[0] for row in cursor.fetchall()] if PAIMON_TABLE not in tables: logger.warning(f当前 Catalog 下的表: {tables}) raise Exception(fPaimon 表 {PAIMON_TABLE} 未找到) # 预览数据 sql fSELECT * FROM {PAIMON_TABLE} ORDER BY dt, user_id LIMIT 5 cursor.execute(sql) results cursor.fetchall() logger.info(fPaimon 数据预览 (前5条):) for row in results: logger.info(row) if not results: raise Exception(Paimon 表为空请先在 Flink 端写入数据) return len(results) measure_time def create_starrocks_target_table(cursor): 创建 StarRocks 4.0 内部表 (Primary Key 模型) # 【差异点】切回内部 CatalogStarRocks 默认为 default_catalog cursor.execute(SET CATALOG default_catalog) cursor.execute(fCREATE DATABASE IF NOT EXISTS {SR_TEST_DB}) cursor.execute(fUSE {SR_TEST_DB}) # StarRocks 4.0 推荐使用 Primary Key 模型 # Primary Key 模型不强制要求 Key 列在最前但为了规范我们保持顺序 create_sql f CREATE TABLE IF NOT EXISTS {SR_TARGET_TABLE} ( user_id INT COMMENT 用户ID, item_id INT COMMENT 商品ID, dt VARCHAR(20) COMMENT 日期分区, behavior VARCHAR(50) COMMENT 行为类型, ts DATETIME COMMENT 时间戳 ) PRIMARY KEY(user_id, item_id, dt) PARTITION BY LIST(dt) ( PARTITION p20251212 VALUES IN (2025-12-12), PARTITION p20251213 VALUES IN (2025-12-13) ) DISTRIBUTED BY HASH(user_id) BUCKETS 3 PROPERTIES ( replication_num 3, enable_persistent_index true ); cursor.execute(create_sql) cursor.execute(fTRUNCATE TABLE {SR_TARGET_TABLE}) logger.info(fStarRocks 内表 {SR_TARGET_TABLE} (Primary Key) 已准备就绪) measure_time def execute_etl_paimon_to_starrocks(cursor): 执行 INSERT INTO ... SELECT ... logger.info( 开始执行从 Paimon 到 StarRocks 的数据导入 (ETL) ) # 确保在 default_catalog 下执行或者全路径引用 # 这里使用全路径引用更安全 # 目标: default_catalog.db.table # 源: paimon_catalog.db.table etl_sql f INSERT INTO default_catalog.{SR_TEST_DB}.{SR_TARGET_TABLE} (user_id, item_id, dt, behavior, ts) SELECT user_id, item_id, dt, behavior, ts FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE} cursor.execute(etl_sql) logger.info(ETL SQL 提交完毕) measure_time def verify_data_consistency(cursor): 验证数据一致性 logger.info( 开始数据一致性校验 ) # 1. Paimon 源数据量 cursor.execute(fSELECT count(*) as cnt FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}) paimon_count cursor.fetchone()[cnt] # 2. StarRocks 目标数据量 cursor.execute(fSELECT count(*) as cnt FROM default_catalog.{SR_TEST_DB}.{SR_TARGET_TABLE}) sr_count cursor.fetchone()[cnt] logger.info(fPaimon 源表行数: {paimon_count}) logger.info(fStarRocks 目标表行数: {sr_count}) if paimon_count sr_count: logger.info(✅ 测试通过数据条数一致) else: logger.error(❌ 测试失败数据条数不一致) # 3. 抽样展示 cursor.execute(fSELECT * FROM default_catalog.{SR_TEST_DB}.{SR_TARGET_TABLE} ORDER BY dt, user_id LIMIT 3) rows cursor.fetchall() logger.info(StarRocks 内表数据抽样:) for row in rows: logger.info(row) # 主流程 def main_process(): server None conn None try: logger.info( 1. 正在建立 SSH 隧道 (连接至 StarRocks 4.0 nd4) ...) # 建立隧道 server SSHTunnelForwarder( (SSH_HOST, SSH_PORT), ssh_usernameSSH_USER, ssh_passwordSSH_PASSWORD, remote_bind_address(SR_LOCAL_HOST, SR_QUERY_PORT) ) server.start() logger.info(f SSH 隧道建立成功! 本地端口: {server.local_bind_port}) logger.info( 2. 连接 StarRocks 数据库 ...) conn pymysql.connect( host127.0.0.1, portserver.local_bind_port, userSR_DB_USER, passwordSR_DB_PWD, charsetutf8mb4, cursorclasspymysql.cursors.DictCursor, autocommitTrue ) cursor conn.cursor() # --- 测试步骤 --- # 步骤 1: 初始化 Catalog init_starrocks_catalog(cursor) # 步骤 2: 确认源端 Paimon 可读 check_paimon_source(cursor) # 步骤 3: 准备 StarRocks 目标表 create_starrocks_target_table(cursor) # 步骤 4: 执行导入 execute_etl_paimon_to_starrocks(cursor) # StarRocks 写入速度极快稍微等待事务可见 time.sleep(2) # 步骤 5: 校验结果 verify_data_consistency(cursor) logger.info( StarRocks 4.0 与 Paimon 集成测试全部完成 ) except Exception as e: logger.error(f主流程发生错误: {e}) import traceback logger.error(traceback.format_exc()) finally: if conn: conn.close() logger.info(数据库连接已关闭) if server: server.stop() logger.info(SSH 隧道已关闭) if __name__ __main__: main_process()对应的log文件内容如下2025-12-12 16:39:49,820 - INFO - 1. 正在建立 SSH 隧道 (连接至 StarRocks 4.0 nd4) ... 2025-12-12 16:39:50,103 - INFO - SSH 隧道建立成功! 本地端口: 61825 2025-12-12 16:39:50,104 - INFO - 2. 连接 StarRocks 数据库 ... 2025-12-12 16:39:50,154 - INFO - 正在执行: [init_starrocks_catalog] ... 2025-12-12 16:39:50,154 - INFO - 正在初始化 StarRocks Catalog: paimon_catalog_sr ... 2025-12-12 16:39:50,181 - INFO - 发送 Create Catalog 请求: CREATE EXTERNAL CATALOG paimon_catalog_sr PROPERTIES ( type paimon, paimon.catalog.type hive, hive.metastore.uris thrift://nd1:9083,thrift://nd3:9083, warehouse hdfs://nd1:8020/user/hive/warehouse ); 2025-12-12 16:39:50,219 - INFO - Catalog 创建成功 2025-12-12 16:39:52,219 - INFO - 执行完成: [init_starrocks_catalog] | 耗时: 2.0656 秒 2025-12-12 16:39:52,220 - INFO - 正在执行: [check_paimon_source] ... 2025-12-12 16:39:52,220 - INFO - 检查 Paimon 数据源: paimon_catalog_sr.ods.paimon_source_event 2025-12-12 16:39:52,672 - INFO - Paimon 数据预览 (前5条): 2025-12-12 16:39:52,672 - INFO - {user_id: 1001, item_id: 501, behavior: click, dt: 2025-12-12, ts: datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)} 2025-12-12 16:39:52,673 - INFO - {user_id: 1002, item_id: 502, behavior: view, dt: 2025-12-12, ts: datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)} 2025-12-12 16:39:52,673 - INFO - {user_id: 1003, item_id: 501, behavior: buy, dt: 2025-12-12, ts: datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)} 2025-12-12 16:39:52,673 - INFO - {user_id: 1001, item_id: 503, behavior: view, dt: 2025-12-13, ts: datetime.datetime(2025, 12, 13, 11, 0)} 2025-12-12 16:39:52,674 - INFO - {user_id: 1004, item_id: 501, behavior: click, dt: 2025-12-13, ts: datetime.datetime(2025, 12, 13, 11, 5)} 2025-12-12 16:39:52,674 - INFO - 执行完成: [check_paimon_source] | 耗时: 0.4546 秒 2025-12-12 16:39:52,674 - INFO - 正在执行: [create_starrocks_target_table] ... 2025-12-12 16:39:52,732 - INFO - StarRocks 内表 sr_target_event_sink (Primary Key) 已准备就绪 2025-12-12 16:39:52,733 - INFO - 执行完成: [create_starrocks_target_table] | 耗时: 0.0581 秒 2025-12-12 16:39:52,733 - INFO - 正在执行: [execute_etl_paimon_to_starrocks] ... 2025-12-12 16:39:52,733 - INFO - 开始执行从 Paimon 到 StarRocks 的数据导入 (ETL) 2025-12-12 16:39:56,177 - INFO - ETL SQL 提交完毕 2025-12-12 16:39:56,177 - INFO - 执行完成: [execute_etl_paimon_to_starrocks] | 耗时: 3.4447 秒 2025-12-12 16:39:58,178 - INFO - 正在执行: [verify_data_consistency] ... 2025-12-12 16:39:58,178 - INFO - 开始数据一致性校验 2025-12-12 16:39:58,341 - INFO - Paimon 源表行数: 5 2025-12-12 16:39:58,341 - INFO - StarRocks 目标表行数: 5 2025-12-12 16:39:58,342 - INFO - ✅ 测试通过数据条数一致 2025-12-12 16:39:58,373 - INFO - StarRocks 内表数据抽样: 2025-12-12 16:39:58,374 - INFO - {user_id: 1001, item_id: 501, dt: 2025-12-12, behavior: click, ts: datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)} 2025-12-12 16:39:58,374 - INFO - {user_id: 1002, item_id: 502, dt: 2025-12-12, behavior: view, ts: datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)} 2025-12-12 16:39:58,374 - INFO - {user_id: 1003, item_id: 501, dt: 2025-12-12, behavior: buy, ts: datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)} 2025-12-12 16:39:58,374 - INFO - 执行完成: [verify_data_consistency] | 耗时: 0.1962 秒 2025-12-12 16:39:58,374 - INFO - StarRocks 4.0 与 Paimon 集成测试全部完成 2025-12-12 16:39:58,374 - INFO - 数据库连接已关闭 2025-12-12 16:39:58,402 - INFO - SSH 隧道已关闭9.3 验证数据去StarRocks终端验证数据结果如下mysql -h 10.x.xx.204 -P 9030 -uroot执行下述sql-- 1. 切换到默认内部 Catalog (必须使用 SET CATALOG) SET CATALOG default_catalog; -- 2. 切换到测试数据库 (脚本中定义的 SR_TEST_DB) USE starrocks_paimon_test_db; -- 3. 查看表 SHOW TABLES; -- 预期输出: sr_target_event_sink -- 4. 查询数据 SELECT * FROM sr_target_event_sink ORDER BY user_id; -- 5. 查看数据总量 SELECT count(*) FROM sr_target_event_sink;StarRocks 支持通过 catalog.db.table 的全路径方式进行跨源查询这样不用频繁切换 Catalog。SELECT Paimon Source as source_type, count(*) as cnt FROM paimon_catalog_sr.ods.paimon_source_event UNION ALL SELECT StarRocks Target as source_type, count(*) as cnt FROM default_catalog.starrocks_paimon_test_db.sr_target_event_sink;

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询