Skip to content

Instantly share code, notes, and snippets.

View whitespy's full-sized avatar
🏠
Working from home

Andrey Butenko whitespy

🏠
Working from home
View GitHub Profile
if __name__ == "__main__":
base_manager = RedisBaseManager()
print(id(base_manager))
print(id(base_manager.redis_connection))
base_manager = RedisBaseManager()
print(id(base_manager))
print(id(base_manager.redis_connection))
@whitespy
whitespy / py
Created January 23, 2025 14:03
pubsub.py
async with app["redis_connection"].pubsub() as pubsub:
await pubsub.psubscribe("applicants:*")
try:
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
for ws in app["websockets"].get(message["channel"], []):
if not ws.closed:
await ws.send_json(message["data"])
# Ця таска буде працюати у фоні.
# Приклад https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks
async def dispatch_redis_pubsub_messages(app):
async with app["redis_connection"].pubsub() as pubsub:
await pubsub.psubscribe("user:*")
try:
while True:
try:
message = await pubsub.get_message(ignore_subscribe_messages=True)
2
3
4
5
6
class AssignmentCellSubmission(CoreModel):
assignment_submission = models.ForeignKey(
AssignmentSubmission,
on_delete=models.CASCADE,
related_name="cell_submissions",
verbose_name=_("assignment submission"),
)
assignment_cell = models.ForeignKey(
"courses.AssignmentCell",
on_delete=models.CASCADE,
{
"uuid": "941a4e17-eaa8-4f0e-a4ef-39c502eeb6b2",
"type": "code",
"subtype": "autograded_answer",
"points": null,
"source": [
"import threading\n",
"\n",
"### BEGIN SOLUTION\n",
"a = 0\n",
class CourseQuerySet(CoreQuerySet):
def annotate_by_user_flags(self, user):
return self.annotate(
is_user_course_editor=Case(
When(
Exists(
OrganizationMember.objects.filter(
organization=OuterRef("campus__organization"), user=user, role=UserRole.ADMIN
)
)
class OrganizationViewSet(mixins.CreateModelMixin, mixins.RetrieveModelMixin, mixins.ListModelMixin, GenericViewSet):
lookup_field = "slug"
lookup_url_kwarg = "organization_slug"
queryset = Organization.objects.none()
permission_classes = [IsAuthenticated]
serializer_class = OrganizationSerializer
def get_queryset(self):
return self.request.user.membership_organizations.all()
class OrganizationCreateListAPIView(GenericAPIView):
queryset = Organization.objects.all()
serializer_class = OrganizationSerializer
response_serializer_class = OrganizationSerializer
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
organization_service = OrganizationService()
organizations = organization_service.get_organizations(self.request.user.pk)
serializer = OrganizationSerializer(organizations, many=True)
"third_party_services":[
{
"name": "Spotlight",
"uuid": "2ebebb64-e98b-4a6c-8fd4-d26592f74a25",
"provider":{
"name": "Spotlight Verlag",
"currency":{"code": "CHF", "name": "Schweizer Franken"}
},
"subscription_option":{
"name": "E-Paper",