微信手机网站开发,网站推广与搜索引擎优化,应用公园是收费还是免费的,网上招聘网站开发报告大家好#xff0c;我是jobleap.cn的小九。
psycopg2-binary 是 Python 连接 PostgreSQL 数据库的核心库#xff08;psycopg2 的预编译二进制版本#xff0c;无需编译依赖#xff0c;开箱即用#xff09;#xff0c;本文将从环境准备、核心 API 讲解到实战案例#xff0c…大家好我是jobleap.cn的小九。psycopg2-binary 是 Python 连接 PostgreSQL 数据库的核心库psycopg2的预编译二进制版本无需编译依赖开箱即用本文将从环境准备、核心 API 讲解到实战案例全面串联其常用用法帮助你掌握 PostgreSQL 数据库的 Python 操作全流程。一、环境准备1. 安装 psycopg2-binary使用 pip 快速安装推荐 Python 3.6 版本# 安装最新版pip install psycopg2-binary# 安装指定版本如适配特定 PostgreSQL 版本pip install psycopg2-binary2.9.92. 前置条件已安装并启动 PostgreSQL 服务本地/远程拥有可访问的 PostgreSQL 数据库、用户名和密码确保目标数据库端口默认 5432未被防火墙拦截。二、核心概念与基础 APIpsycopg2 的核心操作围绕连接Connection和游标Cursor展开Connection负责与 PostgreSQL 数据库建立网络连接管理事务Cursor基于连接创建的操作句柄用于执行 SQL 语句、获取查询结果。1. 数据库连接connect()psycopg2.connect()是创建数据库连接的核心函数支持通过参数或 DSN 字符串传参常用参数如下参数说明默认值host数据库服务器地址localhostport数据库端口5432dbname/database目标数据库名-user数据库用户名当前系统用户password数据库密码-sslmodeSSL 连接模式如 requiredisable基础连接示例importpsycopg2frompsycopg2importOperationalErrordefcreate_connection(db_name,db_user,db_password,db_host,db_port):创建数据库连接并返回 Connection 对象connectionNonetry:connectionpsycopg2.connect(databasedb_name,userdb_user,passworddb_password,hostdb_host,portdb_port,)print(PostgreSQL 连接成功 ✅)exceptOperationalErrorase:print(f连接失败 ❌{e})returnconnection# 替换为你的数据库信息conncreate_connection(db_nametest_db,db_userpostgres,db_password123456,db_hostlocalhost,db_port5432)2. 游标创建与 SQL 执行cursor()/execute()创建连接后需通过conn.cursor()创建游标再用游标执行 SQL 语句cursor.execute(sql, params)执行单条 SQL 语句支持参数化查询cursor.executemany(sql, params_list)批量执行相同结构的 SQL 语句psycopg2.extras.execute_batch(cursor, sql, params_list, page_size100)高性能批量执行推荐替代executemany。1创建数据表示例defcreate_table(connection):创建用户表userscreate_table_query CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name VARCHAR(50) NOT NULL, age INT, email VARCHAR(100) UNIQUE, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); try:# 创建游标cursorconnection.cursor()# 执行 SQLcursor.execute(create_table_query)# 提交事务psycopg2 默认关闭自动提交必须手动提交connection.commit()print(数据表创建成功 ✅)exceptExceptionase:print(f创建表失败 ❌{e})# 异常时回滚事务connection.rollback()finally:# 关闭游标避免资源泄漏cursor.close()# 调用创建表函数ifconn:create_table(conn)2参数化查询防 SQL 注入关键psycopg2 使用%s作为占位符而非 Python 的{}或%参数需以元组/列表传入。definsert_single_user(connection,name,age,email):插入单条用户数据参数化查询insert_query INSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING; # 避免重复插入 try:cursorconnection.cursor()# 传入参数元组形式cursor.execute(insert_query,(name,age,email))connection.commit()print(f插入用户{name}成功 ✅)exceptExceptionase:print(f插入失败 ❌{e})connection.rollback()finally:cursor.close()# 插入单条数据ifconn:insert_single_user(conn,张三,25,zhangsanexample.com)3批量插入数据frompsycopg2importextrasdefbatch_insert_users(connection,users_list):批量插入用户数据高性能版insert_query INSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING; try:cursorconnection.cursor()# 高性能批量执行page_size 控制每次批量提交的条数extras.execute_batch(cursor,insert_query,users_list,page_size100)connection.commit()print(f批量插入{len(users_list)}条数据成功 ✅)exceptExceptionase:print(f批量插入失败 ❌{e})connection.rollback()finally:cursor.close()# 批量插入示例数据ifconn:users_data[(李四,28,lisiexample.com),(王五,30,wangwuexample.com),(赵六,22,zhaoliuexample.com)]batch_insert_users(conn,users_data)3. 数据查询fetchone()/fetchmany()/fetchall()执行查询类 SQLSELECT后需通过游标获取结果cursor.fetchone()获取下一条结果返回元组无数据时返回 Nonecursor.fetchmany(size)获取指定条数的结果返回列表元素为元组cursor.fetchall()获取所有剩余结果返回列表元素为元组cursor.rowcount返回受上一条 SQL 影响的行数查询时为匹配的行数。查询示例defquery_users(connection,age_min0):查询年龄大于等于 age_min 的用户query SELECT id, name, age, email, create_time FROM users WHERE age %s; try:cursorconnection.cursor()cursor.execute(query,(age_min,))# 方式1获取单条数据# single_user cursor.fetchone()# if single_user:# print(单条结果, single_user)# 方式2获取指定条数如2条# partial_users cursor.fetchmany(2)# print(部分结果, partial_users)# 方式3获取所有结果all_userscursor.fetchall()print(f\n查询到{cursor.rowcount}条符合条件的用户)foruserinall_users:# 解析元组id, name, age, email, create_timeprint(fID:{user[0]}, 姓名:{user[1]}, 年龄:{user[2]}, 邮箱:{user[3]}, 创建时间:{user[4]})exceptExceptionase:print(f查询失败 ❌{e})finally:cursor.close()# 查询年龄≥25的用户ifconn:query_users(conn,age_min25)4. 数据更新与删除更新/删除操作与插入逻辑一致需注意事务提交和参数化defupdate_user_age(connection,email,new_age):根据邮箱更新用户年龄update_query UPDATE users SET age %s WHERE email %s; try:cursorconnection.cursor()cursor.execute(update_query,(new_age,email))connection.commit()ifcursor.rowcount0:print(f更新{email}的年龄为{new_age}成功 ✅)else:print(f未找到邮箱为{email}的用户 ❌)exceptExceptionase:print(f更新失败 ❌{e})connection.rollback()finally:cursor.close()defdelete_user(connection,user_id):根据ID删除用户delete_queryDELETE FROM users WHERE id %s;try:cursorconnection.cursor()cursor.execute(delete_query,(user_id,))connection.commit()ifcursor.rowcount0:print(f删除ID为{user_id}的用户成功 ✅)else:print(f未找到ID为{user_id}的用户 ❌)exceptExceptionase:print(f删除失败 ❌{e})connection.rollback()finally:cursor.close()# 执行更新和删除ifconn:update_user_age(conn,zhangsanexample.com,26)delete_user(conn,3)# 删除ID为3的用户query_users(conn)# 重新查询验证结果5. 事务管理commit()/rollback()psycopg2 默认关闭「自动提交」模式所有修改类操作INSERT/UPDATE/DELETE/CREATE都需要手动调用conn.commit()确认若执行过程中出现异常需调用conn.rollback()回滚事务避免数据不一致。事务回滚示例deftest_transaction(connection):测试事务回滚try:cursorconnection.cursor()# 第一步插入数据cursor.execute(INSERT INTO users (name, age, email) VALUES (%s, %s, %s),(测试用户,99,testexample.com))# 第二步故意触发错误比如插入重复邮箱cursor.execute(INSERT INTO users (name, age, email) VALUES (%s, %s, %s),(重复用户,88,zhangsanexample.com))# 无异常则提交connection.commit()exceptExceptionase:print(f事务执行失败触发回滚 ❌{e})connection.rollback()# 回滚所有未提交的操作finally:cursor.close()# 测试事务回滚最终 测试用户 不会被插入ifconn:test_transaction(conn)query_users(conn)6. 类型转换PostgreSQL ↔ Pythonpsycopg2 会自动完成 PostgreSQL 类型与 Python 类型的转换常用映射关系如下PostgreSQL 类型Python 类型INT/SERIALintVARCHAR/TEXTstrTIMESTAMP/DATEdatetime.datetime/dateBOOLEANboolARRAYlistJSON/JSONBdict/list需导入 extrasJSON 类型操作示例frompsycopg2.extrasimportJsondeftest_json_type(connection):测试 JSON 类型字段操作# 1. 先添加 JSON 字段alter_queryALTER TABLE users ADD COLUMN IF NOT EXISTS info JSONB;# 2. 更新 JSON 数据update_queryUPDATE users SET info %s WHERE email %s;try:cursorconnection.cursor()cursor.execute(alter_query)# 传入 Python 字典通过 Json 封装user_info{hobby:[篮球,编程],address:北京市}cursor.execute(update_query,(Json(user_info),zhangsanexample.com))connection.commit()# 3. 查询 JSON 字段cursor.execute(SELECT name, info FROM users WHERE email %s;,(zhangsanexample.com,))resultcursor.fetchone()print(f\nJSON 字段查询结果)print(f姓名{result[0]}, 信息{result[1]})print(f提取 hobby{result[1][hobby]})# 直接按字典访问exceptExceptionase:print(fJSON 操作失败 ❌{e})connection.rollback()finally:cursor.close()ifconn:test_json_type(conn)7. 连接池生产环境必备频繁创建/关闭连接会消耗大量资源生产环境建议使用连接池psycopg2.pool复用连接frompsycopg2importpool# 创建连接池最小1个最大5个连接connection_poolpool.SimpleConnectionPool(minconn1,maxconn5,databasetest_db,userpostgres,password123456,hostlocalhost,port5432)defuse_pooled_connection():使用连接池获取连接# 从池获取连接connconnection_pool.getconn()ifconn:print(\n从连接池获取连接成功 ✅)query_users(conn)# 归还连接到池不是关闭connection_pool.putconn(conn)# 测试连接池use_pooled_connection()# 关闭连接池程序退出时connection_pool.closeall()8. 资源释放使用完连接和游标后必须关闭以释放资源推荐通过finally块确保执行# 最终关闭连接非连接池场景ifconn:try:conn.close()print(\n数据库连接已关闭 ✅)exceptExceptionase:print(f关闭连接失败 ❌{e})三、完整实战脚本串联所有 API以下脚本整合了上述所有常用操作可直接运行需替换数据库信息importpsycopg2frompsycopg2importOperationalError,extrasfrompsycopg2.extrasimportJsonfrompsycopg2importpool# 1. 创建数据库连接或连接池defcreate_connection(db_name,db_user,db_password,db_host,db_port):connectionNonetry:connectionpsycopg2.connect(databasedb_name,userdb_user,passworddb_password,hostdb_host,portdb_port,)print(PostgreSQL 连接成功 ✅)exceptOperationalErrorase:print(f连接失败 ❌{e})returnconnection# 2. 创建数据表defcreate_table(connection):create_table_query CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name VARCHAR(50) NOT NULL, age INT, email VARCHAR(100) UNIQUE, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, info JSONB ); try:cursorconnection.cursor()cursor.execute(create_table_query)connection.commit()print(数据表创建成功 ✅)exceptExceptionase:print(f创建表失败 ❌{e})connection.rollback()finally:cursor.close()# 3. 插入/更新/删除/查询definsert_user(connection,user_data):insert_queryINSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING;try:cursorconnection.cursor()extras.execute_batch(cursor,insert_query,user_data,page_size100)connection.commit()print(f批量插入{len(user_data)}条数据成功 ✅)exceptExceptionase:print(f插入失败 ❌{e})connection.rollback()finally:cursor.close()defupdate_user_info(connection,email,info):update_queryUPDATE users SET info %s WHERE email %s;try:cursorconnection.cursor()cursor.execute(update_query,(Json(info),email))connection.commit()print(f更新{email}的扩展信息成功 ✅)exceptExceptionase:print(f更新失败 ❌{e})connection.rollback()finally:cursor.close()defquery_users(connection,age_min0):querySELECT id, name, age, email, info FROM users WHERE age %s;try:cursorconnection.cursor()cursor.execute(query,(age_min,))all_userscursor.fetchall()print(f\n查询到{cursor.rowcount}条用户数据)foruserinall_users:print(fID:{user[0]}, 姓名:{user[1]}, 年龄:{user[2]}, 邮箱:{user[3]}, 扩展信息:{user[4]})exceptExceptionase:print(f查询失败 ❌{e})finally:cursor.close()defdelete_user(connection,user_id):delete_queryDELETE FROM users WHERE id %s;try:cursorconnection.cursor()cursor.execute(delete_query,(user_id,))connection.commit()print(f删除ID为{user_id}的用户{成功ifcursor.rowcount0else失败}✅)exceptExceptionase:print(f删除失败 ❌{e})connection.rollback()finally:cursor.close()# 主流程if__name____main__:# 替换为你的数据库信息DB_CONFIG{db_name:test_db,db_user:postgres,db_password:123456,db_host:localhost,db_port:5432}# 创建连接conncreate_connection(**DB_CONFIG)ifnotconn:exit(1)# 执行核心操作create_table(conn)insert_user(conn,[(张三,25,zhangsanexample.com),(李四,28,lisiexample.com),(王五,30,wangwuexample.com)])update_user_info(conn,zhangsanexample.com,{hobby:[篮球,编程],address:北京市})query_users(conn,age_min25)delete_user(conn,3)query_users(conn,age_min25)# 关闭连接ifconn:conn.close()print(\n数据库连接已关闭 ✅)四、常见问题与注意事项SQL 注入风险严禁拼接 SQL 字符串必须使用%s占位符传参编码问题PostgreSQL 默认编码为 UTF8Python 脚本需确保编码一致连接超时远程连接时需设置connect_timeout参数如connect(..., connect_timeout10)大结果集处理避免使用fetchall()改用fetchone()或fetchmany()分批读取防止内存溢出版本兼容psycopg2-binary 版本需与 PostgreSQL 服务版本适配如 2.9.x 适配 PostgreSQL 12。五、总结psycopg2-binary 的核心流程可总结为创建连接 → 创建游标 → 执行 SQL → 处理结果 → 提交/回滚事务 → 释放资源掌握connect()、cursor()、execute()、commit()、fetch*()等核心 API结合参数化查询、事务管理和连接池即可安全、高效地实现 PostgreSQL 数据库的增删改查。生产环境中还需注意异常捕获、资源释放和性能优化如批量操作、索引设计确保系统稳定运行。