GoogleAPIClient Gmail Overview#

Overview#

本文档记录了我如何使用 Gmail API 来读取邮件内容并自动化发送邮件.

Examples#

test_gmail_client.py
 1# -*- coding: utf-8 -*-
 2
 3import json
 4from pathlib import Path
 5
 6import google.auth.exceptions
 7from google.auth.transport.requests import Request
 8from google.oauth2.credentials import Credentials
 9from google_auth_oauthlib.flow import InstalledAppFlow
10from googleapiclient.discovery import build
11from googleapiclient.errors import HttpError
12
13SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
14
15dir_here = Path(__file__).absolute().parent
16path_temp_json = dir_here / "tmp.json"
17
18dir_home = Path.home()
19dir_google = dir_home / ".google"
20dir_google.mkdir(parents=True, exist_ok=True)
21
22path_client_secrets = (
23    dir_google / "send_and_receive_email_via_gmail_poc_client_secrets.json"
24)
25# If modifying these scopes, delete the file send_and_receive_email_via_gmail_poc_token.json.
26path_token = dir_google / "send_and_receive_email_via_gmail_poc_token.json"
27
28
29def auth():
30    creds = None
31    # The file token.json stores the user's access and refresh tokens, and is
32    # created automatically when the authorization flow completes for the first
33    # time.
34    if path_token.exists():
35        creds = Credentials.from_authorized_user_file(str(path_token), SCOPES)
36    # If there are no (valid) credentials available, let the user log in.
37    if not creds or not creds.valid:
38        need_re_auth = True
39        if creds and creds.expired and creds.refresh_token:
40            try:
41                creds.refresh(Request())
42                need_re_auth = False
43            except google.auth.exceptions.RefreshError as e:
44                pass
45
46        if need_re_auth:
47            flow = InstalledAppFlow.from_client_secrets_file(
48                str(path_client_secrets),
49                SCOPES,
50            )
51            creds = flow.run_local_server(port=0)
52        # Save the credentials for the next run
53        path_token.write_text(creds.to_json())
54
55    service = build("gmail", "v1", credentials=creds)
56    return service
57
58
59def list_threads(service):
60    res = service.users().messages().list(userId="me").execute()
61    path_temp_json.write_text(json.dumps(res, indent=4))
62
63
64def get_thread_details(service, thread_id: str):
65    res = service.users().threads().get(userId="me", id=thread_id).execute()
66    path_temp_json.write_text(json.dumps(res, indent=4))
67
68
69if __name__ == "__main__":
70    service = auth()
71    list_threads(service)
72    # get_thread_details(service, thread_id="18ec9466c9dc95e2")