GoogleAPIClient Gmail Overview#

Overview#

本文档记录了我如何使用 Gmail API 来读取邮件内容并自动化发送邮件.

Examples#

test_gmail_client.py
 1# -*- coding: utf-8 -*-
 2
 3import json
 4from pathlib import Path
 5
 6from google.auth.transport.requests import Request
 7from google.oauth2.credentials import Credentials
 8from google_auth_oauthlib.flow import InstalledAppFlow
 9from googleapiclient.discovery import build
10from googleapiclient.errors import HttpError
11
12SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
13
14dir_here = Path(__file__).absolute().parent
15path_temp_json = dir_here / "tmp.json"
16
17dir_home = Path.home()
18dir_google = dir_home / ".google"
19dir_google.mkdir(parents=True, exist_ok=True)
20
21path_client_secrets = (
22    dir_google / "send_and_receive_email_via_gmail_poc_client_secrets.json"
23)
24# If modifying these scopes, delete the file send_and_receive_email_via_gmail_poc_token.json.
25path_token = dir_google / "send_and_receive_email_via_gmail_poc_token.json"
26
27
28def auth():
29    creds = None
30    # The file token.json stores the user's access and refresh tokens, and is
31    # created automatically when the authorization flow completes for the first
32    # time.
33    if path_token:
34        creds = Credentials.from_authorized_user_file(str(path_token), SCOPES)
35    # If there are no (valid) credentials available, let the user log in.
36    if not creds or not creds.valid:
37        if creds and creds.expired and creds.refresh_token:
38            creds.refresh(Request())
39        else:
40            flow = InstalledAppFlow.from_client_secrets_file(
41                str(path_client_secrets),
42                SCOPES,
43            )
44            creds = flow.run_local_server(port=0)
45        # Save the credentials for the next run
46        path_token.write_text(creds.to_json())
47
48    service = build("gmail", "v1", credentials=creds)
49    return service
50
51
52def list_threads(service):
53    res = service.users().messages().list(userId="me").execute()
54    path_temp_json.write_text(json.dumps(res, indent=4))
55
56
57def get_thread_details(service, thread_id: str):
58    res = service.users().threads().get(userId="me", id=thread_id).execute()
59    path_temp_json.write_text(json.dumps(res, indent=4))
60
61
62if __name__ == "__main__":
63    service = auth()
64    list_threads(service)
65    get_thread_details(service, thread_id="18ec9466c9dc95e2")