Comments (8)
Okay, Thank you, used add_listener and solved this issue.
from tiktoklive.
Going to keep this open until I add the 'fix' for the way you were trying to do it.
I think your solution is fine, can you make a pull request?
from tiktoklive.
you have not given enough code, i can't reproduce with what you have provided.
from tiktoklive.
@isaackogan, here is my full code
from re import Match
from abc import ABC
from TikTokLive.client.client import TikTokLiveClient
from TikTokLive.events import ConnectEvent, DisconnectEvent, LiveEndEvent
from TikTokLive.events import proto_events, custom_events
import re
HANDLED_MESSAGE_EVENTS = [
proto_events.JoinEvent, proto_events.LikeEvent, proto_events.GiftEvent,
proto_events.EnvelopeEvent, proto_events.SubscribeEvent, custom_events.FollowEvent,
custom_events.ShareEvent, proto_events.EmoteChatEvent
]
LIVE_URL_REGEX = re.compile('^https?://(www\\.)?tiktok\\.com/@([^/]+)/live$')
LIVESTREAM_STOP_OR_FAILED_RESPONSE = -1
def extract_unique_id(stream_url: str):
match: Match = LIVE_URL_REGEX.match(stream_url)
if match:
return match.groups()[-1]
return None
class TikTokLiveStreamIterator(TikTokLiveClient):
def __init__(self, stream_url: str, **options):
TikTokLiveClient.__init__(self, unique_id=stream_url)
self.on(ConnectEvent, self._process_connect_event)
self.on(DisconnectEvent, self._process_disconnect_event)
self.on(LiveEndEvent, self._process_live_end_event)
for message_event in HANDLED_MESSAGE_EVENTS:
self.on(message_event, self._process_message_event)
async def _process_connect_event(self, event: ConnectEvent):
from datetime import datetime
output_path = f"{event.unique_id}-{datetime.utcnow()}.mp4"
from TikTokLive.client.web.routes.fetch_video import VideoFetchQuality
self.web.fetch_video.start(
output_fp=output_path,
room_info=self.room_info,
output_format="mp4",
quality=VideoFetchQuality.ORIGIN
)
print(f"Client connected.")
async def _process_disconnect_event(self, event: DisconnectEvent):
if self.web.fetch_video.is_recording:
self.web.fetch_video.stop()
print(f"Client disconnected!")
async def _process_live_end_event(self, event: LiveEndEvent):
print(f"Stream ended.")
async def _process_message_event(self, event):
print(f"\n\n**********************--- {event.get_type().lower()} ---******************************")
try:
print(f"{event.user.unique_id}")
except Exception as e:
print(e)
print(event.__dict__)
print("********************************************************************************\n\n")
async def start(self) -> None:
print(f"Starting iterator for unique id: {self.unique_id}.")
await TikTokLiveClient.start(self, fetch_room_info=True)
async def stop(self) -> None:
print(f"Stopping iterator for unique id: {self.unique_id}.")
TikTokLiveClient.remove_all_listeners(self)
await TikTokLiveClient.disconnect(self)
print(f"Stopped iterator for unique id: {self.unique_id}.")
@classmethod
def can_monitor_stream_url(cls, stream_url: str):
return LIVE_URL_REGEX.match(stream_url) is not None
import asyncio
async def main():
tt = TikTokLiveStreamIterator('https://www.tiktok.com/@wakbogel_2/live')
await tt.start()
await asyncio.sleep(60)
await tt.stop()
asyncio.run(main())
from tiktoklive.
Okay, so basically for the pattern you are trying to accomplish you should use client.add_listener
and not client.on
as that was intended strictly a decorator.
It's a little complicated why it doesn't work...I guess technically speaking it is a bug, but it was never intended to be used the way you did, so not sure if I should call it that.
I can support it being used functionally like that in the future, but for now, use the intended client.add_listener
instead and it works :)
from tiktoklive.
So, in this file TikTokLive/client/client.py code update in function add_listener
def add_listener(self, event: Type[Event], f: EventHandler) -> Handler:
"""
Method that can be used to register a Python function as an event listener
:param event: The event to listen to
:param f: The function to handle the event
:return: The generated `pyee.Handler` object
"""
if isinstance(event, str):
return super().add_listener(event=event, f=f)
return super().add_listener(event=event.get_type(), f=f)
And this change works for me, could you please update this change in you package?
from tiktoklive.
Thank you for your compliment, Pull request is here -pull-request
from tiktoklive.
Merged
from tiktoklive.
Related Issues (20)
- TikTokLive.client.errors.UserOfflineError HOT 2
- Gift examples doesnt work anymore "count" HOT 2
- SendMessage Method HOT 1
- httpx.Timeout Error HOT 4
- Unexpected error fetching when user is offline HOT 2
- Video download failed after few time later HOT 3
- Failed to fetch room id from Webcast, see stacktrace for more info. HOT 2
- GiftEvent triggered twice for one gift HOT 3
- pay_grade is always empty in RoomUserSeqEvent HOT 1
- pull request HOT 1
- TypeError: string indices must be integers, not 'str' HOT 1
- Passing room_id to the start method returns error from the sign API HOT 2
- Automatic off event data capture but user still live HOT 6
- Error when downloading livestream from 1 number of users HOT 1
- AttributeError: 'str' object has no attribute 'get_type' HOT 1
- Detecting user exit from live HOT 3
- raise mapped_exc(message) from exc httpx.ReadTimeout
- Failed request to Sign API with status code 503 and payload "b'{"code":503,"error":"API is not ready for connections yet."}'". HOT 1
- Get the previous user_id that comment before joined live
- Download stopped no exception
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from tiktoklive.