import argparse
import asyncio
import json
import logging
import os
import time
import wave
from aiohttp import web
import aiortc
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.mediastreams import (AudioFrame, AudioStreamTrack, VideoFrame,
VideoStreamTrack)
import psycopg2
import hashlib
DB_details = {"db":"db_name",
"user": "usr_name",
"pass":"password",
"host":"localhost",
"port":"5432"
}
connect_str = "dbname={} user={} host={} port={} password={}".format(DB_details["db"],
DB_details["user"],
DB_details["host"],
DB_details["port"],
DB_details["pass"])
ROOT = os.path.dirname(__file__)
rtc_io = {
'call-1': {
'1': {
'audio': [],
'video': [],
'connected': False
},
'2': {
'audio': [],
'video': [],
'connected': False
}
}
}
def md5(input_string):
return hashlib.md5(input_string.encode('utf-8')).hexdigest()
async def pause(last, ptime):
if last:
now = time.time()
await asyncio.sleep(last + ptime - now)
return time.time()
class AudioFileTrack(AudioStreamTrack):
def __init__(self, path):
self.last = None
self.reader = wave.Wave_read(path)
async def recv(self):
self.last = await pause(self.last, 0.02)
return AudioFrame(
channels=self.reader.getnchannels(),
data=self.reader.readframes(160),
sample_rate=self.reader.getframerate())
class AudioRemoteTrack(AudioStreamTrack):
def __init__(self, call_id, remote_id):
self.last = None
self.call_id = call_id
print("remote id ",remote_id)
self.remote_id = remote_id
self.default_frame = AudioFrame(channels=1, data=b'\x00\x00' * 160, sample_rate=8000)
async def recv(self):
self.last = await pause(self.last, 0.02)
if rtc_io[self.call_id][self.remote_id]['audio']:
audio_frame = rtc_io[self.call_id][self.remote_id]['audio'].pop(0)
return audio_frame
else:
return self.default_frame
class VideoRemoteTrack(VideoStreamTrack):
def __init__(self, call_id, remote_id):
self.last = None
self.call_id = call_id
self.remote_id = remote_id
self.default_frame = VideoFrame(width=640, height=480)
async def recv(self):
self.last = await pause(self.last, 0.05)
if int(self.remote_id) == 2:
print('2: ', rtc_io[self.call_id][self.remote_id]['video'])
if rtc_io[self.call_id][self.remote_id]['video']:
self.default_frame = video_frame = rtc_io[self.call_id][self.remote_id]['video'].pop(0)
return video_frame
else:
return self.default_frame
class VideoDummyTrack(VideoStreamTrack):
def __init__(self):
width = 640
height = 480
self.counter = 0
self.frame_green = VideoFrame(width=width, height=height)
self.frame_remote = VideoFrame(width=width, height=height)
self.last = None
async def recv(self):
self.last = await pause(self.last, 0.04)
self.counter += 1
if (self.counter % 100) < 50:
return self.frame_green
else:
return self.frame_remote
async def consume_audio(track, call_id, local_id, remote_id):
while True:
await asyncio.sleep(0.02)
frame = await track.recv()
if rtc_io[call_id][remote_id]["connected"]:
rtc_io[call_id][local_id]['audio'].append(frame)
async def consume_video(track, call_id, local_id, remote_id):
if rtc_io[call_id][remote_id]["connected"]:
while True:
rtc_io[call_id][local_id]['video'].append(await track.recv())
async def consume_video2(track, local_video):
"""
Drain incoming video, and echo it back.
"""
while True:
local_video.frame_remote = await track.recv()
async def index(request):
html = open(os.path.join(ROOT, 'index.html'), 'r').read()
return web.Response(content_type='text/html', text=html)
async def offer(request):
data = await request.json()
offer = data['offer']
offer = RTCSessionDescription(
sdp=offer['sdp'],
type=offer['type'])
pc = RTCPeerConnection()
pc._consumers = []
pcs.append(pc)
local_id = data['local_id']
call_dict = rtc_io[data['call_id']]
remote_id = [k for k in call_dict.keys() if k != local_id][0]
call_dict[local_id]["connected"] = True
# there is only two keys in call_dict and we want the other key
remote_audio = AudioRemoteTrack(data['call_id'], remote_id)
remote_video = VideoRemoteTrack(data['call_id'], remote_id)
@pc.on('datachannel')
def on_datachannel(channel):
@channel.on('message')
def on_message(message):
channel.send('pong')
@pc.on('track')
def on_track(track):
if track.kind == 'audio':
pc.addTrack(remote_audio)
pc._consumers.append(asyncio.ensure_future(consume_audio(track, data['call_id'], local_id, remote_id)))
elif track.kind == 'video':
pc.addTrack(remote_video)
pc._consumers.append(asyncio.ensure_future(consume_video(track, data['call_id'], local_id, remote_id)))
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
content_type='application/json',
text=json.dumps({
'sdp': pc.localDescription.sdp,
'type': pc.localDescription.type
}))
pcs = []
async def on_shutdown(app):
# stop audio / video consumers
for pc in pcs:
for c in pc._consumers:
c.cancel()
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
async def call(request):
conn = psycopg2.connect(connect_str)
cursor = conn.cursor()
# call_id = str(md5(str(time.time())))
call_inserted_id = 'call-1'
caller_id = request.query['caller_id']
callee_id = request.query['callee_id']
rtc_io[call_inserted_id] = {}
rtc_io[call_inserted_id][caller_id] = {
'audio': [],
'video': [],
'connected':False
}
rtc_io[call_inserted_id][callee_id] = {
'audio': [],
'video': [],
'connected':False
}
sql = f"""
INSERT INTO calls (id,caller,callee,
type,start_time,status)
VALUES ('{call_inserted_id}',{caller_id},{callee_id},2,{time.time()},1)
returning id;
"""
cursor.execute(sql)
conn.commit()
return web.Response(
content_type='application/json',
text=json.dumps({
'call_id': call_inserted_id
}))
async def call_answer(request):
call_id = request.query["call_id"]
status = request.query["status"]
conn = psycopg2.connect(connect_str)
cursor = conn.cursor()
sql = f"""
update calls
set status = {status}
where id = '{call_id}';
"""
cursor.execute(sql)
conn.commit()
sql2 = f"""
select caller
from calls
where id = '{call_id}';
"""
cursor.execute(sql2)
res = cursor.fetchone()
rtc_io[call_id][res[0]]["audio"] = []
return web.Response(
content_type='application/json',
text=json.dumps({
'status': 'success'
}))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='WebRTC audio / video / data-channels demo')
parser.add_argument('--port', type=int, default=8080,
help='Port for HTTP server (default: 8080)')
parser.add_argument('--verbose', '-v', action='count')
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_get('/call', call)
app.router.add_get('/call_answer', call_answer)
app.router.add_get('/', index)
app.router.add_post('/offer', offer)
web.run_app(app, port=args.port, host="0.0.0.0")