import gspread
import datetime
from googleapiclient.discovery import build
import time as tm
sa = gspread.service_account(filename="/Users/jitendersingh/Documents/sheetsapi.json")
sh = sa.open('YT_Channel_views')
sheet = sh.worksheet('Sheet1')
# Replace with your API key
API_KEY = "AIzaSyDytk3KmNuIs5cnT5jMJOV684Uq0LRKXNo"
# Create a YouTube Data API client
youtube = build("youtube", "v3", developerKey=API_KEY)
#Function to get youtube videos title, views, link
def get_channel_videos(channel_id):
videos = []
next_page_token = None
# Get the "uploads" playlist ID for the channel
channels_response = youtube.channels().list(
id=channel_id,
part='contentDetails'
).execute()
uploads_playlist_id = channels_response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
while True:
# Fetch the videos in the "uploads" playlist
playlist_items_request = youtube.playlistItems().list(
playlistId=uploads_playlist_id,
part='snippet',
# maxResults=50,
pageToken=next_page_token
)
playlist_items_response = playlist_items_request.execute()
# Extract video IDs
video_ids = [item['snippet']['resourceId']['videoId'] for item in playlist_items_response['items']]
# Fetch video details and statistics for each video ID
videos_request = youtube.videos().list(
part="snippet,statistics",
id=','.join(video_ids)
)
videos_response = videos_request.execute()
# Process each video
for item in videos_response['items']:
video = {
"title": item['snippet']['title'],
"views": item['statistics']['viewCount'],
"video_link": f"https://www.youtube.com/watch?v={item['id']}"
}
videos.append(video)
# Check if there are more pages of results
next_page_token = playlist_items_response.get("nextPageToken")
if not next_page_token:
break
return videos
#Function to get youtube channel id using url / username (sometime username wont work)
def get_channel_info_using_api(url):
"""
Extracts channel ID or username and uses Youtube Data API to get channel name and ID.
Args:
url: The URL of the YouTube channel.
Returns:
A dictionary containing 'id' and 'name' keys, or None if parsing fails.
"""
# Regex patterns for URL formats
channel_id_pattern = r"channel/([^/]+)"
username_pattern = r"@([^/]+)"
# Extract channel ID or username based on URL format
match = re.search(channel_id_pattern, url)
if match:
channel_id = match.group(1)
else:
match = re.search(username_pattern, url)
if match:
channel_username = match.group(1)
channel_id = None # Initialize channel_id here to avoid the error
else:
return None # URL format not recognized
# Use Youtube Data API to get channel information based on ID or username
from googleapiclient.discovery import build # Import library for Youtube Data API
youtube = build("youtube", "v3", developerKey=API_KEY)
if channel_id:
# Get channel information by ID
try:
response = youtube.channels().list(
part="snippet", id=channel_id
).execute()
channel_name = response["items"][0]["snippet"]["title"]
return {"id": channel_id, "name": channel_name}
except:
return None # Handle potential errors during API call
else:
# Get channel information by username (search by channel name)
search_results = youtube.search().list(
q=channel_username, part="snippet", type="CHANNEL"
).execute()
for result in search_results["items"]:
if result["snippet"]["title"] == channel_username:
try:
channel_id = result["snippet"]["channelId"] # Assign channel_id here
response = youtube.channels().list(
part="snippet", id=channel_id
).execute()
channel_name = response["items"][0]["snippet"]["title"]
return {"id": channel_id, "name": channel_name}
except:
return None # Handle potential errors during API call
return None # Channel not found by username
#Function to and save before 24 hours data in sheet.
def save_to_google_sheets(videos, worksheet_name, spreadsheet):
try:
# Try to open the specified worksheet, create it if it doesn't exist
try:
worksheet = spreadsheet.worksheet(worksheet_name)
except gspread.exceptions.WorksheetNotFound:
worksheet = spreadsheet.add_worksheet(title=worksheet_name, cols="20", rows="18000")
# Get the existing data range to append new data below it
existing_data = worksheet.get_all_values()
start_row = len(existing_data) + 1 # Start from the first empty row after existing data
# Prepare the data to be written
data = [[video["title"], video["views"], video["video_link"]] for video in videos]
# Append the data to the worksheet in batches
batch_size = 100 # Number of rows to append in each batch
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
end_row = start_row + len(batch) - 1
cell_range = f"A{start_row}:C{end_row}" # Assuming the data starts from column A
worksheet.update(cell_range, batch)
start_row = end_row + 1
print("Data saved to Google Sheets successfully.")
except gspread.exceptions.APIError as e:
print("Error: Unable to access Google Sheets API.")
print(e)
#Function to save after 24 hours data in channel name's sheet.
def save_to_google_sheets_after(videos, worksheet_name, spreadsheet):
try:
# Try to open the specified worksheet, if it exists
worksheet = None
worksheets = spreadsheet.worksheets()
for ws in worksheets:
if ws.title.lower() == worksheet_name.lower(): # Case-insensitive comparison
worksheet = ws
break
if not worksheet:
print(f"Worksheet '{worksheet_name}' not found.")
return # Exit the function if worksheet not found
start_row = 2 # Start from the first empty row after existing data
# Prepare the data to be written
data = [[video["title"], video["views"], video["video_link"]] for video in videos]
# Append the data to the worksheet in batches
batch_size = 100 # Number of rows to append in each batch
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
end_row = start_row + len(batch) - 1
cell_range = f"D{start_row}:F{end_row}" # Corrected cell range to cover columns D, E, and F
worksheet.update(cell_range, batch)
start_row = end_row + 1
print("Data saved to Google Sheets successfully.")
except gspread.exceptions.APIError as e:
print("Error: Unable to access Google Sheets API.")
print(e)
#Function to create before 24 hours sheet using channel name.
def create_worksheet_for_channel():
# Get all non-empty cells in the first column of Sheet1
first_column_cells = sheet.findall(re.compile(r'^https?://'))
for cell in first_column_cells:
url = cell.value
# Check if the corresponding cell in the second column is empty
second_column_value = sheet.cell(cell.row, 2).value
if not second_column_value:
# Get channel details
channel_info = get_channel_info_using_api(url)
if channel_info:
channel_name = channel_info["name"]
channel_id = channel_info["id"]
# Check if a worksheet with the same name already exists
if channel_name in sh.worksheets():
print(f"Worksheet '{channel_name}' already available.")
else:
# Create a new worksheet with the channel name
new_worksheet = sh.add_worksheet(title=channel_name, cols="9", rows="18000")
# Add headers to the new worksheet
headers = ["Title", "Views Earlier", "Link", "Title", "Views Later", "Link"]
new_worksheet.insert_row(headers, index=1)
print(f"Worksheet '{channel_name}' created.")
#get videos using channel id
videos = get_channel_videos(channel_id)
print(f"Got the videos of '{channel_name}'.")
#save videos into google sheet
save_to_google_sheets(videos, channel_name, sh)
print(f"Saved into google shhet:-'{channel_name}'.")
sheet.update_cell(cell.row, 2, "First Process Done")
# Get current date and time
current_date = datetime.datetime.now().strftime("%d-%m-%Y") # Format date as dd-mm-yyyy
current_time = datetime.datetime.now().strftime("%H:%M") # Format time in 24-hour format
# Update cells
sheet.update_cell(cell.row, 3, current_date) # Updating current date in the third column
sheet.update_cell(cell.row, 4, current_time) # Updating current time in the fourth column
else:
print(f"Failed to get channel info for URL: {url}")
else:
# Get channel name from the URL
channel_info = get_channel_info_using_api(url)
if channel_info:
channel_name = channel_info["name"]
print(f"First process already done for channel '{channel_name}'.")
#special function to delete 1 row 3 cells and shift them up
def delete_and_shift_up(sheet, start_row, start_col, num_rows, num_cols):
"""
Delete cells in the specified range and shift the remaining cells up.
Args:
- sheet: gspread Worksheet object
- start_row: int, starting row index
- start_col: int, starting column index
- num_rows: int, number of rows to delete
- num_cols: int, number of columns to delete
"""
# Get the sheet ID
sheet_id = sheet.id
# Construct the request to delete the range
delete_request = {
"deleteRange": {
"range": {
"sheetId": sheet_id,
"startRowIndex": start_row - 1, # Adjust for 0-based index
"endRowIndex": start_row + num_rows - 1,
"startColumnIndex": start_col - 1, # Adjust for 0-based index
"endColumnIndex": start_col + num_cols - 1
},
"shiftDimension": "ROWS" # Shift remaining cells up
}
}
# Send the batch update request
sheet.spreadsheet.batch_update({"requests": [delete_request]})
#Function to clear after 24 hours extra videos cells and then use another function to shift up
def synchronize_google_sheet(sheet):
print("Correcting row Start.")
# Get the value from the 2nd row, column 3
value_to_match = sheet.cell(2, 3).value
# Find the matching row number in column 6
matching_row = None
for row in range(1, sheet.row_count + 1):
if sheet.cell(row, 6).value == value_to_match:
matching_row = row
print(f"Match found in row {matching_row}.")
break
if matching_row is None:
print("Value not found in column 6.")
return
# Clear cells in columns 4, 5, and 6 up to the matching row
for clear_row in range(2, matching_row):
for col in range(4, 7):
sheet.update_cell(clear_row, col, "")
for i in range (matching_row - 2):
delete_and_shift_up(sheet, 2, 4, 1, 3)
# Apply formula "=ABS(B1-E1)" in whole column 7
formula = "=ABS(B1-E1)"
range_to_apply = f"G1:G1" # Assuming G is column 7
sheet.update(range_to_apply, formula)
print("Updating rows completed.")
#after 24 hours worksheet update, and all the last function work in this function.
def worksheet_after():
# Get all non-empty cells in the first column of Sheet1
first_column_cells = sheet.findall(re.compile(r'^https?://'))
for cell in first_column_cells:
url = cell.value
# Check if the corresponding cell in the fifth column is empty
fifth_column_value = sheet.cell(cell.row, 5).value
if not fifth_column_value:
# Get channel details
channel_info = get_channel_info_using_api(url)
if channel_info:
channel_name = channel_info["name"].strip().lower() # Normalize channel name
channel_id = channel_info["id"]
worksheets = sh.worksheets()
worksheet_titles = [worksheet.title.strip().lower() for worksheet in worksheets] # Normalize worksheet titles
# print(worksheet_titles)
# Check if a worksheet with the same name already exists
if channel_name in worksheet_titles:
print(channel_name)
# Get videos using channel id
videos = get_channel_videos(channel_id)
print(f"Got the videos of '{channel_name}'.")
#save videos into google sheet
save_to_google_sheets_after(videos, channel_name, sh)
print(f"After data Saved into google shhet:-'{channel_name}'.")
#Correct extra row
synchronize_google_sheet(channel_name)
sheet.update_cell(cell.row, 5, "24 hours Process Done")
# Get current date and time
current_date = datetime.datetime.now().strftime("%d-%m-%Y") # Format date as dd-mm-yyyy
current_time = datetime.datetime.now().strftime("%H:%M") # Format time in 24-hour format
# Update cells
sheet.update_cell(cell.row, 6, current_date) # Updating current date in the third column
sheet.update_cell(cell.row, 7, current_time) # Updating current time in the fourth column
else:
print(f"Failed to get channel info for URL: {url}")
else:
# Get channel name from the URL
channel_info = get_channel_info_using_api(url)
if channel_info:
channel_name = channel_info["name"]
print(f"24 hours process already done for channel '{channel_name}'.")
create_worksheet_for_channel()
# print("first process complete---------------------")
worksheet_after()