你的python应用可以通过如下官方MongoDB Python驱动访问MongoDB数据库:
安装最新版本模块:$ python -m pip install 'pymongo[srv]'
安装指定版本模块:$ python -m pip install 'pymongo[srv]'==3.11
升级到最新版本模块:$ python -m pip install --upgrade 'pymongo[srv]'
import pymongo
# Replace the uri string with your MongoDB deployment's connection string.
conn_str = "mongodb+srv://<username>:<password>@<cluster-address>/test?retryWrites=true&w=majority"
# set a 5-second connection timeout
client = pymongo.MongoClient(conn_str, serverSelectionTimeoutMS=5000)
try:
print(client.server_info())
except Exception:
print("Unable to connect to the server.")
你可以使用MongoDB 5.0和PyMongo 驱动版本3.12 Versioned API特性。在更新驱动或数据库后,你可以使用Versioned API而不用担心向后兼容的问题。为了使用该特性,需要指定Versioned API的版本。
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
# Replace <connection string> with your MongoDB deployment's connection string.
conn_str = "<connection string>"
# Set the version of the Versioned API on the client.
client = pymongo.MongoClient(conn_str, server_api=ServerApi('1'), serverSelectionTimeoutMS=5000)
使用连接字符串"mongodb://localhost:<port>" where <port>
Tutorial on using Motor with Tornado
Tutorial on using Motor with asyncio
Motor Documentation
安装最新版本模块:$ python -m pip install motor
如果你使用asyncio异步框架,可以使用如下代码:
import asyncio
import motor.motor_asyncio
async def get_server_info():
# replace this with your MongoDB connection string
conn_str = "<your MongoDB Atlas connection string>"
# set a 5-second connection timeout
client = motor.motor_asyncio.AsyncIOMotorClient(conn_str, serverSelectionTimeoutMS=5000)
try:
print(await client.server_info())
except Exception:
print("Unable to connect to the server.")
loop = asyncio.get_event_loop()
loop.run_until_complete(get_server_info())
如果你使用tornado异步框架,可以使用如下代码:
import tornado
import motor
async def get_server_info():
# replace this with your MongoDB connection string
conn_str = "<your MongoDB Atlas connection string>"
# set a 5-second connection timeout
client = motor.motor_tornado.MotorClient(conn_str, serverSelectionTimeoutMS=5000)
try:
print(await client.server_info())
except Exception:
print("Unable to connect to the server.")
tornado.ioloop.IOLoop.current().run_sync(get_server_info)
使用连接字符串"mongodb://localhost:<port>" where <port>