Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from pydantic import BaseModel
from jose import jwt
from jose.exceptions import JWTError
from kernelci.api.models import (

Check failure on line 49 in api/main.py

View workflow job for this annotation

GitHub Actions / Lint

Unable to import 'kernelci.api.models'
Node,
Hierarchy,
PublishEvent,
Expand Down Expand Up @@ -1412,7 +1412,13 @@
"""Listen messages from a subscribed Pub/Sub channel"""
metrics.add('http_requests_total', 1)
try:
return await pubsub.listen(sub_id, user.username)
result = await pubsub.listen(sub_id, user.username)
# Debug: print result type and check for bytes
print(f"[DEBUG listen] result type: {type(result)}")
if result:
for k, v in result.items():
print(f"[DEBUG listen] {k}: {type(v)} = {repr(v)[:100]}")
return result
except KeyError as error:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
Expand All @@ -1423,6 +1429,10 @@
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error while listening to sub id {sub_id}: {str(error)}"
) from error
except Exception as error:
print(f"[DEBUG listen] Unexpected: {type(error).__name__}: {error}")
traceback.print_exc()
raise


@app.post('/publish/{channel}')
Expand Down
19 changes: 17 additions & 2 deletions api/pubsub_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,21 @@ async def _update_subscriber_state(self, subscriber_id: str,
{'$set': update}
)

@staticmethod
def _decode_redis_message(msg: Dict) -> Dict:
"""Decode Redis message bytes to strings for JSON serialization"""
return {
'type': msg.get('type'),
'pattern': (msg.get('pattern').decode('utf-8')
if msg.get('pattern') else None),
'channel': (msg['channel'].decode('utf-8')
if isinstance(msg['channel'], bytes)
else msg['channel']),
'data': (msg['data'].decode('utf-8')
if isinstance(msg['data'], bytes)
else msg['data']),
}

def _eventhistory_to_cloudevent(self, event: Dict) -> str:
"""Convert eventhistory document to CloudEvent JSON string

Expand Down Expand Up @@ -547,10 +562,10 @@ async def listen(self, sub_id: int,

# Filter by owner if not promiscuous
if sub.promiscuous:
return msg
return self._decode_redis_message(msg)
if 'owner' in msg_data and msg_data['owner'] != sub.user:
continue
return msg
return self._decode_redis_message(msg)

async def publish(self, channel: str, message: str):
"""Publish a message on a channel (Redis only, no durability)"""
Expand Down