Mock Teams connections to test services that work with teams, channels, chats, and presence.
Note: If you're new to unit testing with Zato, check the tutorial first.
The service:
from zato.server.service import Service
class GetMyTeams(Service):
name = 'teams.get-my-teams'
def handle(self):
conn = self.cloud.ms365.get('ms365.Teams').conn
with conn.client() as client:
teams = client.impl.teams()
my_teams = teams.get_my_teams()
self.response.payload = {
'teams': [{'name': t['name'], 'id': t['id']} for t in my_teams]
}
The test:
from zato_testing import ServiceTestCase
from myapp.services import GetMyTeams
class TestGetMyTeams(ServiceTestCase):
def test_returns_teams(self):
self.set_response('ms365.Teams.teams', {'id': 'teams-api'})
self.set_response('ms365.Teams.teams.get_my_teams', [
{'name': 'Engineering', 'id': 'team-001'},
{'name': 'Marketing', 'id': 'team-002'},
{'name': 'Sales', 'id': 'team-003'}
])
service = self.invoke(GetMyTeams)
teams = service.response.payload['teams']
self.assertEqual(len(teams), 3)
self.assertEqual(teams[0]['name'], 'Engineering')
The service:
class GetTeamChannels(Service):
name = 'teams.channels.get'
input = 'team_id'
def handle(self):
conn = self.cloud.ms365.get('ms365.Teams').conn
with conn.client() as client:
teams = client.impl.teams()
team = teams.get_team(self.request.input.team_id)
channels = team.get_channels()
self.response.payload = {
'channels': [{'name': c['name'], 'id': c['id']} for c in channels]
}
The test:
class TestGetTeamChannels(ServiceTestCase):
def test_returns_channels(self):
self.set_response('ms365.Teams.teams', {'id': 'teams-api'})
self.set_response('ms365.Teams.teams.get_team', {'id': 'team-001', 'name': 'Engineering'})
self.set_response('ms365.Teams.teams.get_team.get_channels', [
{'name': 'General', 'id': 'channel-001'},
{'name': 'Development', 'id': 'channel-002'},
{'name': 'Code Reviews', 'id': 'channel-003'}
])
service = self.invoke(GetTeamChannels, team_id='team-001')
self.assertEqual(len(service.response.payload['channels']), 3)
class TestSendChannelMessage(ServiceTestCase):
def test_sends_message(self):
self.set_response('ms365.Teams.teams', {'id': 'teams-api'})
self.set_response('ms365.Teams.teams.get_team', {'id': 'team-001', 'name': 'Engineering'})
self.set_response('ms365.Teams.teams.get_team.get_channel', {
'id': 'channel-001',
'name': 'General'
})
self.set_response('ms365.Teams.teams.get_team.get_channel.send_message', {
'id': 'msg-001'
})
service = self.invoke(SendChannelMessage,
team_id='team-001',
channel_id='channel-001',
message='Hello team!'
)
self.assertEqual(service.response.payload['sent'], True)
self.assertEqual(service.response.payload['message_id'], 'msg-001')
class TestGetMyChats(ServiceTestCase):
def test_returns_chats(self):
self.set_response('ms365.Teams.teams', {'id': 'teams-api'})
self.set_response('ms365.Teams.teams.get_my_chats', [
{'id': 'chat-001', 'chat_type': 'oneOnOne'},
{'id': 'chat-002', 'chat_type': 'group'},
{'id': 'chat-003', 'chat_type': 'meeting'}
])
service = self.invoke(GetMyChats)
self.assertEqual(len(service.response.payload['chats']), 3)
self.assertEqual(service.response.payload['chats'][1]['chat_type'], 'group')
class TestSendChatMessage(ServiceTestCase):
def test_sends_chat_message(self):
self.set_response('ms365.Teams.teams', {'id': 'teams-api'})
self.set_response('ms365.Teams.teams.get_chat', {'id': 'chat-001', 'chat_type': 'oneOnOne'})
self.set_response('ms365.Teams.teams.get_chat.send_message', {
'id': 'msg-chat-001'
})
service = self.invoke(SendChatMessage, chat_id='chat-001', content='Hello!')
self.assertEqual(service.response.payload['sent'], True)
self.assertEqual(service.response.payload['message_id'], 'msg-chat-001')
class TestGetMyPresence(ServiceTestCase):
def test_returns_presence(self):
self.set_response('ms365.Teams.teams', {'id': 'teams-api'})
self.set_response('ms365.Teams.teams.get_my_presence', {
'availability': 'Available',
'activity': 'Available'
})
service = self.invoke(GetMyPresence)
self.assertEqual(service.response.payload['availability'], 'Available')
class TestSetMyPresence(ServiceTestCase):
def test_sets_presence(self):
self.set_response('ms365.Teams.teams', {'id': 'teams-api'})
self.set_response('ms365.Teams.teams.set_my_user_preferred_presence', {
'availability': 'Busy',
'activity': 'InACall'
})
service = self.invoke(SetMyPresence, availability='Busy', activity='InACall')
self.assertEqual(service.response.payload['updated'], True)