Skip to content

Instantly share code, notes, and snippets.

@wwnbb
Last active August 4, 2023 09:30
Show Gist options
  • Select an option

  • Save wwnbb/75d2ec862a4c630fca2ef2c65c0689ca to your computer and use it in GitHub Desktop.

Select an option

Save wwnbb/75d2ec862a4c630fca2ef2c65c0689ca to your computer and use it in GitHub Desktop.
Example: how to create many to many relatinships using sqlalchemy
FROM (SELECT users.id AS users_id
FROM users
WHERE users.id = $1::INTEGER) AS anon_1 JOIN user_group AS user_group_1 ON anon_1.users_id = user_group_1.user_id JOIN groups ON groups.id = user_group_1.group_id
2023-08-04 13:29:46,430 INFO sqlalchemy.engine.Engine [generated in 0.00010s] (31,)
2023-08-04 13:29:46,432 INFO sqlalchemy.engine.Engine ROLLBACK
Traceback (most recent call last):
File "/Users/admin/work/py/async_sqlalchemy/main.py", line 134, in <module>
main()
File "/Users/admin/work/py/async_sqlalchemy/main.py", line 130, in main
asyncio.run(amain())
File "/Users/admin/.pyenv/versions/3.11.1/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Users/admin/.pyenv/versions/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/admin/.pyenv/versions/3.11.1/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/admin/.pyenv/versions/3.11.1/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/Users/admin/work/py/async_sqlalchemy/main.py", line 59, in _get_session
raise e
File "/Users/admin/work/py/async_sqlalchemy/main.py", line 56, in _get_session
yield session
File "/Users/admin/work/py/async_sqlalchemy/main.py", line 93, in amain
user1.groups.append(group2)
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/collections.py", line 1130, in append
item = __set(self, item, _sa_initiator, NO_KEY)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/collections.py", line 1095, in __set
item = executor.fire_append_event(item, _sa_initiator, key=key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/collections.py", line 687, in fire_append_event
return self.attr.fire_append_event(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py", line 1755, in fire_append_event
value = fn(state, value, initiator or self._append_token, key=key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 70, in append
sess._save_or_update_state(item_state)
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3369, in _save_or_update_state
self._save_or_update_impl(st_)
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 4013, in _save_or_update_impl
self._update_impl(state)
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 4002, in _update_impl
self.identity_map.add(state)
File "/Users/admin/work/py/async_sqlalchemy/.venv/lib/python3.11/site-packages/sqlalchemy/orm/identity.py", line 195, in add
raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Can't attach instance <Group at 0x10704ec90>; another instance with key (<class '__main__.Group'>, (31,), None) is already present in this session
import asyncio
from sqlalchemy import Column, Integer, String, Table, ForeignKey, select
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_scoped_session,
)
from contextlib import asynccontextmanager
from sqlalchemy.orm import selectinload, sessionmaker, relationship, subqueryload
from sqlalchemy.ext.declarative import declarative_base
DATABASE_URL = "postgresql+asyncpg://admin:@localhost:5432/asynqalchemy"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
Base = declarative_base()
# association table
user_group = Table(
"user_group",
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id")),
Column("group_id", Integer, ForeignKey("groups.id")),
)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String)
groups = relationship(
"Group", secondary=user_group, back_populates="users", lazy="raise"
)
class Group(Base):
__tablename__ = "groups"
id = Column(Integer, primary_key=True, index=True)
name = Column(String)
users = relationship(
"User", secondary=user_group, back_populates="groups", lazy="raise"
)
async def _get_session():
Session = async_scoped_session(
sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False),
scopefunc=asyncio.current_task,
)
session = Session()
try:
yield session
except Exception as e:
await session.rollback()
raise e
else:
await session.commit()
finally:
await session.close()
SessionManager = asynccontextmanager(_get_session)
async def amain():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with SessionManager() as session:
# create new users and groups
user1 = User(name="Alice")
user2 = User(name="Bob")
group1 = Group(name="Admin")
group2 = Group(name="Regular")
# establish many-to-many relationships
user1.groups.append(group1)
user2.groups.extend([group1, group2])
session.add_all([user1, user2])
# commit the transaction
await session.commit()
async with SessionManager() as session:
session.expire_all()
user1 = await session.scalar(
select(User).options(subqueryload(User.groups)).where(User.id == user1.id)
)
user1.groups.append(group2)
await session.commit()
async with SessionManager() as session:
# query users and their groups
for user in (
await session.execute(select(User).options(subqueryload(User.groups)))
).scalars():
print(user.name, [group.name for group in user.groups])
async with SessionManager() as session:
# query users and their groups
for user in (
await session.execute(select(User).options(subqueryload(User.groups)))
).scalars():
print(user.name, [group.name for group in user.groups])
# Filtering by group name
async with SessionManager() as session:
# Filter users by group name
target_group_name = "Admin"
users_in_target_group = (
(
await session.execute(
select(User)
.join(User.groups)
.where(Group.name == target_group_name)
)
)
.scalars()
.all()
)
for user in users_in_target_group:
print(user.name)
def main():
asyncio.run(amain())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment