🔥Building a Teams Bot with AI Capabilities - Part 6 - Reading Teams Attachments🔥
aka, file location and permissions aren't reliable, and are critically important. Great.
This blog series focuses on presenting complex DevOps projects as simple and approachable via plain language and lots of pictures. You can do it!
These articles are supported by readers, please consider subscribing to support me writing more of these articles <3 :)
This article is part of a series of articles, because 1 article would be absolutely massive.
Part 3: Delegated Permissions and Making Lambda Stateful for Oauth2
Part 4: Building the Receiver lambda to store tokens and state
Part 6 (this article): Finding attachments and reading them into Bedrock
Hey all!
In the last article we talked about how to find the root of messages to build our conversational context in Teams. That is an incredibly challenging task, as the Teams client is several raccoons in a trench coat (well, service APIs behind a front-end, same difference). Those services are generally:
Azure Entra/AD API - Authentication and Authorization (is Teams license assigned)
OneDrive API - personal files. Files are shared to the person they are sent to automatically in Teams.
There is some weird behavior because of this. When building a multi-person chat, and sharing files, files are shared to each person in the chat at the time of sharing the file. If a person is added later, they can see previous messages (Teams API) but not previous files (OneDrive API), as files are not batch retroactively shared. #WeirdTeams
SharePoint API - Team/Channel files. When files are uploaded to a Channel, they are shared with that Team automatically.

To solve this, we have a robust couple functions that find, download, and encode properly the attachments that need to be added to the conversation context.
Lets walk through it.
Iterate Through the Messages
To start with, lets catch up with where the last article covered. We went through the functions that fetch the metadata and messages around “channel” type conversations (those in Teams/Channels), “personal” type messages (those in DMs), and how we “massage” the messages to scrub any junk that Teams returns to us in those API calls. There’s a lot there, it’s worth a read if you’re skipping ahead.
But all you really need to know now is that we have a list of all conversation messages in the thread. For DMs, we just respond to the single request (I’d love to add “threading”/”grouping of topics” in DM context in future, but haven’t found an elegant way yet), and for channels we read back all the messages to the root/parent message.
So that covers us up to line 11.
On line 11 and 12, we initialize a list of conversations and context, that we’ll populate in an iterative.
On line 15, we iterate through each message package that we’ve previously built to build conversation turns - the sort of atomic unit of a conversation that bedrock wants to read. We’ll walk through this more as we dig deeper into this function.
We take that turn (line 17) and append it to the conversation (line 19).
Once we have a complete conversation built in Bedrock’s style, we return it to the parent, which will do the Bedrock conversation.
For now, lets dig into how the build_conversation_turn() function works.
| def get_teams_conversation_history(user_graph_auth_token, event_body): | |
| # ... | |
| if conversation_type == "channel": | |
| # ... | |
| elif conversation_type == "personal": | |
| # ... | |
| # This excludes the bot's loading messages, the authentication card, and any messages which are invalid or blank | |
| messages = massage_messages(messages, conversation_type) | |
| # Initialize conversation and content lists | |
| conversation = [] | |
| content = [] | |
| # Iterate through messages and build conversation | |
| for message in messages: | |
| # Iterate over content | |
| conversation_turn = build_conversation_turn(content, message, headers) | |
| # Append the conversation turn to the conversation | |
| conversation.append(conversation_turn) | |
| return conversation |
Every single message from a user has from.user.displayName, and if it exists, we set the sender Role as “user”. Roles in Bedrock can be either “user” (not from the model) and “assistant” (from the model).
On line 7, if there is no displayName, we assume it’s the assistant.
On line 10, we get the message.body.content, which s the text of the message. Teams annoyingly uses html encoding, so we strip as much of that with simple regex as we can, line 13-14. I think Bedrock can interpret this fine, but it annoys the humans (read: me) when reading debug logs, so I remove it anyway.
On line 17 we start building the content block, making sure to include the sender name and text, something like:
“Kyler Middleton says: Tell me the weather”
And if there’s no sender_name (it’s the bot), we just include the text. We’ll make sure to set the role later on.
| def build_conversation_turn(content, message, headers): | |
| # Get sender name | |
| try: | |
| sender_name = message["from"]["user"]["displayName"] | |
| sender_role = "user" | |
| except: | |
| sender_role = "assistant" | |
| # Get text | |
| content_text = message.get("body", {}).get("content", "").strip() | |
| # Clean text, Teams adds a lot of extra html like this: "text": "Kyler Middleton says: <p>Hey Vera, what's up today? </p>" | |
| content_text = re.sub(r"<[^>]+>", "", content_text) | |
| content_text = content_text.replace(" ", " ") | |
| # Set the content block | |
| if sender_role == "user": | |
| content = [ | |
| { | |
| "text": f"{sender_name} says: {content_text}", | |
| } | |
| ] | |
| else: | |
| content = [ | |
| { | |
| "text": f"{content_text}", | |
| } | |
| ] |
Downloading the Attachments
Next we check for any attachments in the message. This isn’t a straight-forward process (obviously, it gets it own article in this series).
The attachment always has a file name (file.name) and file_extension (everything after the last period in the file name), line 8.
We check for an attribute called the file.contentUrl which is the URL address of the content. There’s different ways to authenticate and build the URL to get the file (I miss the simplicity of slack attachments dearly here), so we pass that url to the download_file_for_users() function, line 14. We’ll dig into that shortly.
There’s a couple of other gotchas here we’ll address before we dive into downloading. Bedrock has pretty strict requirements on document names, so we file the name through regex, line 18. We’re only permitted single spaces (Weird?) so we regex that as well, line 20. We also don’t permit leading or trailing spaces, so we remove those on line 22.
Many of these text manipulations could be combined into single lines, but I abhor complex code. DRY is awful, I’d much rather understand exactly what code is doing.
| def build_conversation_turn(content, message, headers): | |
| #... | |
| # Iterate over attachments | |
| if "attachments" in message: | |
| for file in message["attachments"]: | |
| # Isolate name of the file and remove characters before the final period | |
| file_name = file["name"] | |
| file_extension = file["name"].split(".")[-1] | |
| # File is a supported type | |
| file_url = file["contentUrl"] | |
| # Download the file from the content URL. Include headers in case the file is protected (SharePoint) | |
| file_content = download_file_for_user(file_name, file_url, headers) | |
| ### Bedrock API has strict file name reqs: document file name can only contain alphanumeric characters, whitespace characters, hyphens, parentheses, and square brackets. The name can't contain more than one consecutive whitespace character. | |
| # Remove disallowed | |
| file_name = re.sub(r"[^a-zA-Z0-9\s\-\[\]\(\)]", "", file_name) | |
| # Only single spaces allowed | |
| file_name = re.sub(r"\s{2,}", " ", file_name) | |
| # Strip leading and trailing whitespace | |
| file_name = file_name.strip() |
Pain
It’s worth including the note for this function. All my functions have notes that describe broadly what they do, but this one is largely an apology on behalf of Teams’ APIs. I don’t know why the Teams… Team decided to make all of this such a pain to work with, but I hope they stub their toes walking to their beds after turning out the lights.
This function is more than 100 lines of code, so buckle up 😭
First, on line 9, we wrap all this in a try. Why? Because we’re doing an extraordinary amount of logic here, and if we fail any of it we want our script to continue working. If we can’t fetch the attachment, we just continue with the text only. That’s just fine.
First, we check to see if the file_url contains sharepoint.com/sites/, which means the file is stored in SharePoint. We need to fetch the file from SharePoint, which turns out to be an incredibly annoying thing to do.
First, we need the site_id. Lets build a graph call to go fetch that. We establish the primitives by splitting the URL, line 13-17, and then build the URL, line 20. We require the file, line 21, via requests, and check the call, line 22. If it worked, we set the site_id to the response.id, line 23.
With that in hand, we need the default document library ID, so we build a URL to fetch THAT, line 26-29.
Then we need to extract the file path. These sometimes work just fine, and sometimes contain characters (why the heck?). We use a URL encoding function to do the heavy lifting of encoding the full URL plus document name (spaces and characters and such get encoded a bit weird in http), line 32.
Then we FINALLY finally try to download the file, line 34.
| # Download file for user to bot | |
| # I apologize to future developers who have to maintain this code. | |
| # I can only blame the byzantine nature of the Microsoft Graph API and Teams API, | |
| # which stores files in whatever location it feels like, and then | |
| # provides a content URL that may or may not work. | |
| def download_file_for_user(file_name, file_url, headers): | |
| # If file is shared in a Team channel, the file is stored in sharepoint for the Team | |
| # We need to decode and fetch a great deal of info about sharepoint to fetch the file | |
| try: | |
| if ".sharepoint.com/sites/" in file_url: | |
| # Parse SharePoint URL | |
| parts = file_url.split(".sharepoint.com/sites/") | |
| hostname = parts[0].replace("https://", "") + ".sharepoint.com" | |
| site_path_and_file = parts[1] | |
| site_name = site_path_and_file.split("/")[0] | |
| file_relative_path_raw = site_path_and_file[len(site_name)+1:] | |
| # Get the SharePoint site ID | |
| site_lookup_url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:/sites/{site_name}" | |
| site_resp = requests.get(site_lookup_url, headers=headers) | |
| site_resp.raise_for_status() | |
| site_id = site_resp.json()["id"] | |
| # Get default document library (drive) ID | |
| drive_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drive" | |
| drive_resp = requests.get(drive_url, headers=headers) | |
| drive_resp.raise_for_status() | |
| drive_id = drive_resp.json()["id"] | |
| # Try the exact file path, there are sometimes invalid unicode characters like \u202f embedded | |
| file_path_encoded = urllib.parse.quote(file_relative_path_raw, safe="/") | |
| graph_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root:/{file_path_encoded}:/content" | |
| resp = requests.get(graph_url, headers=headers) | |
| if resp.status_code == 200: | |
| print(f"🟢 Download succeeded from SharePoint site using exact path: {graph_url}") | |
| return resp.content | |
| else: | |
| print(f"🚫 Exact path download failed: {resp.status_code} - {resp.text}") |
I noticed quite a few of my test downloads had a “\u202f” which is the unicode character encoding for a “narrow non-breaking space”. Why are those in the file path? I have absolutely no idea dude. But we have a whole block where we just strip that off and see if we can download the file again.
| def download_file_for_user(file_name, file_url, headers): | |
| try: | |
| if ".sharepoint.com/sites/" in file_url: | |
| # ... | |
| # Try to access the file after normalizing the path | |
| normalized_path = unicodedata.normalize("NFKC", file_relative_path_raw).replace('\u202f', ' ') | |
| if normalized_path != file_relative_path_raw: | |
| normalized_encoded = urllib.parse.quote(normalized_path, safe="/") | |
| graph_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root:/{normalized_encoded}:/content" | |
| print(f"🟢 Attempting normalized path download URL: {graph_url}") | |
| resp = requests.get(graph_url, headers=headers) | |
| if resp.status_code == 200: | |
| print(f"🟢 Download succeeded from SharePoint site using normalized path: {graph_url}") | |
| return resp.content | |
| else: | |
| print(f"🚫 Normalized path download failed: {resp.status_code} - {resp.text}") |
If that doesn’t work we want to throw our hands up and retire, but there’s one more method we can use - literally fricking searching SharePoint using graph for the file name, then reading through all the results, one by one, to see if any exactly match the file name.
This a slow, serialized, and generally bad way to get this working. However, it often works, so ¯\_(ツ)_/¯
If that still fails, we just give up for SharePoint paths. I don’t know another method to find the file, other than the SharePoint/Teams API devs simply providing a link to the file (please goddess).
| def download_file_for_user(file_name, file_url, headers): | |
| # ... | |
| try: | |
| if ".sharepoint.com/sites/" in file_url: | |
| # ... | |
| # If that doesn't work, do naive search of all files in the Teams sharepoint site | |
| # If matching file found, download it | |
| # This is a slow hack, but is the only method reliably working to access Teams/SharePoint hosted files | |
| search_url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/search(q='{urllib.parse.quote(file_name)}')" | |
| print(f"🟢 Attempting fallback file search: {search_url}") | |
| search_response = requests.get(search_url, headers=headers) | |
| if search_response.status_code == 200: | |
| for item in search_response.json().get("value", []): | |
| if item.get("name") == file_name: | |
| item_id = item["id"] | |
| download_url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/content" | |
| file_response = requests.get(download_url, headers=headers) | |
| if file_response.status_code == 200: | |
| print(f"🟢 Download succeeded from SharePoint search by file name: {download_url}") | |
| return file_response.content | |
| else: | |
| print(f"🚫 SharePoint item download failed: {file_response.status_code} - {file_response.text}") |
Next up, we try to download the file from the user’s personal OneDrive. This works if the user has uploaded the file themselves in a DM context.
Again, this is a relatively “bad” solution since it requires our own user to have uploaded the file. If the file was uploaded in a DM context by another user, I think we just won’t find it. Sigh.
| def download_file_for_user(file_name, file_url, headers): | |
| try: | |
| if ".sharepoint.com/sites/" in file_url: | |
| #... | |
| # If files are shared in a Teams chat, they are stored in the user's OneDrive under "Microsoft Teams Chat Files" | |
| # This is the method most often used for 1:1 chats or group chats | |
| try: | |
| base_path = f"Microsoft Teams Chat Files/{file_name}" | |
| encoded_path = urllib.parse.quote(base_path, safe="/") | |
| graph_url = f"https://graph.microsoft.com/v1.0/me/drive/root:/{encoded_path}:/content" | |
| response = requests.get(graph_url, headers=headers) | |
| if response.status_code == 200: | |
| print(f"🟢 Download succeeded from user's OneDrive: {graph_url}") | |
| return response.content | |
| else: | |
| print(f"🚫 OneDrive download failed: {response.status_code} - {response.text}") | |
| except Exception as e: | |
| print(f"🚫 OneDrive download error: {str(e)}") |
Lastly, we try to directly download the file via the file_url. You’d think this would be the most reliable method for downloading files. After all, this is the only method we use for slack and it works 100% of the time.
However, I have never seen this work in my testing and live auditing. I have no idea why, but I keep it because, well, it SHOULD work, right? And maybe it’ll catch some edge case that I haven’t tested yet.
| def download_file_for_user(file_name, file_url, headers): | |
| try: | |
| if ".sharepoint.com/sites/" in file_url: | |
| #... | |
| #... | |
| # Last resort, try to directly download the file from the content URL | |
| # This sometimes works in 1:1 contexts, but fails so often it's our last resort | |
| try: | |
| response = requests.get(file_url, headers=headers) | |
| if response.status_code == 200: | |
| print(f"Download succeeded from direct URL: {file_url}") | |
| return response.content | |
| else: | |
| print(f"🚫 Direct URL download failed: {response.status_code} - {response.text}") | |
| except Exception as e: | |
| print(f"🚫 Direct URL download error: {str(e)}") | |
| print(f"🚫 Could not download file {file_name}") | |
| return None |
Finish the Conversation
After that, the logic is pretty much the same as the Slack Vera bot - we attach the bytecode of the files to the conversation turns, pack each one with metadata to identify it as “user” or “assistant”, and pass it to the model.
The model returns a response, and we pass it to the Teams chat as a response.
It’s quite a bit slower than the Slack Vera, largely due to the vastly greater number of API requests required to do pretty much anything - finding the conversation ID, mapping different types of identifiers together, finding and testing all different paths for downloading files, etc.
It’s hard to beat Teams’ price (FREE, or at least table stakes in your MSFT 0.00%↑ EA agreements), but it remains a bear to work with. Thank goodness it’s improving, it was utterly awful a few years ago.
Summary
We covered even further the idiosyncrasies of the various APIs that Teams uses, and how they basically operate like a raccoon in a trench coat - lots of loosely collected APIs built in different standards and with differently methodologies, trying to operate as a single entity. It sometimes even works properly.
That’s probably it for the Teams Vera series. I’ve discovered Strands, an open source tool from AWS that permits building MCP-enabled Agents to do stuff in a standard way, and I’m entirely hooked. I’m barely keeping my non-AI job right now, this stuff is just too cool.
My next articles will surely follow that trend.
Good luck out there.
kyler

