GoogleAPIClient Gmail Overview#
Overview#
本文档记录了我如何使用 Gmail API 来读取邮件内容并自动化发送邮件.
Examples#
test_gmail_client.py
1# -*- coding: utf-8 -*-
2
3import json
4from pathlib import Path
5
6import google.auth.exceptions
7from google.auth.transport.requests import Request
8from google.oauth2.credentials import Credentials
9from google_auth_oauthlib.flow import InstalledAppFlow
10from googleapiclient.discovery import build
11from googleapiclient.errors import HttpError
12
13SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
14
15dir_here = Path(__file__).absolute().parent
16path_temp_json = dir_here / "tmp.json"
17
18dir_home = Path.home()
19dir_google = dir_home / ".google"
20dir_google.mkdir(parents=True, exist_ok=True)
21
22path_client_secrets = (
23 dir_google / "send_and_receive_email_via_gmail_poc_client_secrets.json"
24)
25# If modifying these scopes, delete the file send_and_receive_email_via_gmail_poc_token.json.
26path_token = dir_google / "send_and_receive_email_via_gmail_poc_token.json"
27
28
29def auth():
30 creds = None
31 # The file token.json stores the user's access and refresh tokens, and is
32 # created automatically when the authorization flow completes for the first
33 # time.
34 if path_token.exists():
35 creds = Credentials.from_authorized_user_file(str(path_token), SCOPES)
36 # If there are no (valid) credentials available, let the user log in.
37 if not creds or not creds.valid:
38 need_re_auth = True
39 if creds and creds.expired and creds.refresh_token:
40 try:
41 creds.refresh(Request())
42 need_re_auth = False
43 except google.auth.exceptions.RefreshError as e:
44 pass
45
46 if need_re_auth:
47 flow = InstalledAppFlow.from_client_secrets_file(
48 str(path_client_secrets),
49 SCOPES,
50 )
51 creds = flow.run_local_server(port=0)
52 # Save the credentials for the next run
53 path_token.write_text(creds.to_json())
54
55 service = build("gmail", "v1", credentials=creds)
56 return service
57
58
59def list_threads(service):
60 res = service.users().messages().list(userId="me").execute()
61 path_temp_json.write_text(json.dumps(res, indent=4))
62
63
64def get_thread_details(service, thread_id: str):
65 res = service.users().threads().get(userId="me", id=thread_id).execute()
66 path_temp_json.write_text(json.dumps(res, indent=4))
67
68
69if __name__ == "__main__":
70 service = auth()
71 list_threads(service)
72 # get_thread_details(service, thread_id="18ec9466c9dc95e2")