import gspread
import datetime
from googleapiclient.discovery import build
import time as tm
import csv
import os
import pandas as pd
sa = gspread.service_account(filename="/Users/jitendersingh/Documents/sheetsapi.json")
sh = sa.open('YT_Channel_views')
sheet = sh.worksheet('Sheet1')
# Replace with your API key
API_KEY = "" #jin634
# API_KEY = "" #Jitenders634
# Create a YouTube Data API client
youtube = build("youtube", "v3", developerKey=API_KEY)
#Function to get youtube videos title, views, link
def get_channel_videos(channel_id):
videos = []
next_page_token = None
# Get the "uploads" playlist ID for the channel
channels_response = youtube.channels().list(
id=channel_id,
part='contentDetails'
).execute()
uploads_playlist_id = channels_response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
while True:
# Fetch the videos in the "uploads" playlist
playlist_items_request = youtube.playlistItems().list(
playlistId=uploads_playlist_id,
part='snippet',
# maxResults=50,
pageToken=next_page_token
)
playlist_items_response = playlist_items_request.execute()
# Extract video IDs
video_ids = [item['snippet']['resourceId']['videoId'] for item in playlist_items_response['items']]
# Fetch video details and statistics for each video ID
videos_request = youtube.videos().list(
part="snippet,statistics",
id=','.join(video_ids)
)
videos_response = videos_request.execute()
# Process each video
for item in videos_response['items']:
video = {
"title": item['snippet']['title'],
"views": item['statistics']['viewCount'],
"video_link": f"https://www.youtube.com/watch?v={item['id']}"
}
videos.append(video)
# Check if there are more pages of results
next_page_token = playlist_items_response.get("nextPageToken")
if not next_page_token:
break
return videos
#Function to get youtube channel id using url / username (sometime username wont work)
def get_channel_info_using_api(url):
"""
Extracts channel ID or username and uses Youtube Data API to get channel name and ID.
Args:
url: The URL of the YouTube channel.
Returns:
A dictionary containing 'id' and 'name' keys, or None if parsing fails.
"""
# Regex patterns for URL formats
channel_id_pattern = r"channel/([^/]+)"
username_pattern = r"@([^/]+)"
# Extract channel ID or username based on URL format
match = re.search(channel_id_pattern, url)
if match:
channel_id = match.group(1)
else:
match = re.search(username_pattern, url)
if match:
channel_username = match.group(1)
channel_id = None # Initialize channel_id here to avoid the error
else:
return None # URL format not recognized
# Use Youtube Data API to get channel information based on ID or username
from googleapiclient.discovery import build # Import library for Youtube Data API
youtube = build("youtube", "v3", developerKey=API_KEY)
if channel_id:
# Get channel information by ID
try:
response = youtube.channels().list(
part="snippet", id=channel_id
).execute()
channel_name = response["items"][0]["snippet"]["title"]
return {"id": channel_id, "name": channel_name}
except:
return None # Handle potential errors during API call
else:
# Get channel information by username (search by channel name)
search_results = youtube.search().list(
q=channel_username, part="snippet", type="CHANNEL"
).execute()
for result in search_results["items"]:
if result["snippet"]["title"] == channel_username:
try:
channel_id = result["snippet"]["channelId"] # Assign channel_id here
response = youtube.channels().list(
part="snippet", id=channel_id
).execute()
channel_name = response["items"][0]["snippet"]["title"]
return {"id": channel_id, "name": channel_name}
except:
return None # Handle potential errors during API call
return None # Channel not found by username
#Function to create before 24 hours sheet using channel name.
def create_worksheet_for_channel():
# Get all non-empty cells in the first column of Sheet1
first_column_cells = sheet.findall(re.compile(r'^https?://'))
for cell in first_column_cells:
url = cell.value
# Check if the corresponding cell in the second column is empty
second_column_value = sheet.cell(cell.row, 2).value
if not second_column_value:
# Get channel details
channel_info = get_channel_info_using_api(url)
if channel_info:
channel_name = channel_info["name"]
channel_id = channel_info["id"]
# Check if a worksheet with the same name already exists
if channel_name in sh.worksheets():
print(f"Worksheet '{channel_name}' already available.")
else:
# Create a new worksheet with the channel name
new_worksheet = sh.add_worksheet(title=channel_name, cols="9", rows="18000")
# Add headers to the new worksheet
headers = ["Title", "Views Earlier", "Link", "Title", "Views Later", "Link"]
new_worksheet.insert_row(headers, index=1)
print(f"Worksheet '{channel_name}' created.")
#get videos using channel id
print(f"Getting the videos of '{channel_name}'.")
videos = get_channel_videos(channel_id)
print(f"Got the videos of '{channel_name}'.")
csv_file_path = "/Users/jitendersingh/Documents/youtube_csv/"+channel_name+" BEFORE.csv"
save_to_csv(videos, csv_file_path)
print(f"Saved the csv file of '{channel_name}'.")
sheet.update_cell(cell.row, 2, channel_name)
# Get current date and time
current_date = datetime.datetime.now().strftime("%d %B %Y") # Format date as dd-mm-yyyy
current_time = datetime.datetime.now().strftime("%H:%M") # Format time in 24-hour format
# Update cells
sheet.update_cell(cell.row, 3, current_date) # Updating current date in the third column
sheet.update_cell(cell.row, 4, current_time) # Updating current time in the fourth column
else:
print(f"Failed to get channel info for URL: {url}")
else:
# Get channel name from the URL
channel_info = get_channel_info_using_api(url)
if channel_info:
channel_name = channel_info["name"]
print(f"First process already done for channel '{channel_name}'.")
#saving videos to csv for backup
def save_to_csv(videos, file_path):
# Define the field names (columns) for the CSV file
field_names = ["Title", "Views", "Video Link"]
# Write the video information to the CSV file
with open(file_path, mode="w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=field_names)
writer.writeheader()
for video in videos:
writer.writerow({
"Title": video["title"],
"Views": video["views"],
"Video Link": video["video_link"]
})
#Function to merge csv files, and merge them, and delete extra videos cells
def synchronize_csv_files(old_csv_path, new_csv_path, output_csv_path):
# Read the old and new CSV files into DataFrames
old_df = pd.read_csv(old_csv_path)
new_df = pd.read_csv(new_csv_path)
# Find the matching value in the first column of the second row in the new CSV
matching_value = old_df.iloc[1, 0]
# Delete rows from the new CSV until the matching value is found in the first column
while new_df.iloc[1, 0] != matching_value:
new_df = new_df.iloc[1:]
# Reset the index of the new DataFrame
new_df.reset_index(drop=True, inplace=True)
# Combine the rows from both CSV files into a single DataFrame
combined_df = pd.concat([old_df, new_df], axis=1)
# Save the synchronized CSV to the output file
combined_df.to_csv(output_csv_path, index=False)
#count video views difference
def calculate_numbers(input_csv_path, output_csv_path):
# Read the synchronized CSV file into a DataFrame
df = pd.read_csv(input_csv_path)
# Calculate the result in the 6th column based on the values in the 2nd and 5th columns
df['GAP'] = df['Views.1'] - df['Views']
# Save the modified CSV to the output file with a new name
df.to_csv(output_csv_path, index=False)
#after 24 hours worksheet update, and all the last function work in this function.
def worksheet_after():
# Get all non-empty cells in the first column of Sheet1
first_column_cells = sheet.findall(re.compile(r'^https?://'))
for cell in first_column_cells:
url = cell.value
# Check if the corresponding cell in the fifth column is empty
fifth_column_value = sheet.cell(cell.row, 5).value
if not fifth_column_value:
# Get channel details
channel_info = get_channel_info_using_api(url)
if channel_info:
raw_channel_name = channel_info["name"]
channel_name = channel_info["name"].strip().lower() # Normalize channel name
channel_id = channel_info["id"]
sa = gspread.service_account(filename="/Users/jitendersingh/Documents/sheetsapi.json")
sh = sa.open('YT_Channel_views')
worksheets = sh.worksheets()
worksheet_titles = [worksheet.title.strip().lower() for worksheet in worksheets] # Normalize worksheet titles
# print(worksheet_titles)
# Check if a worksheet with the same name already exists
if channel_name in worksheet_titles:
print(channel_name)
# Get videos using channel id
print(f"Getting the videos of '{channel_name}'.")
videos = get_channel_videos(channel_id)
print(f"Got the videos of '{channel_name}'.")
after_csv_file_path = "/Users/jitendersingh/Documents/youtube_csv/"+raw_channel_name+" AFTER.csv"
save_to_csv(videos, after_csv_file_path)
print(f"Saved the csv file of '{channel_name}'.")
old_csv_path = "/Users/jitendersingh/Documents/youtube_csv/"+raw_channel_name+" BEFORE.csv"
output_csv_path = "/Users/jitendersingh/Documents/youtube_csv/temp_csv.csv"
synchronize_csv_files(old_csv_path, after_csv_file_path, output_csv_path)
print(f"csv file of '{raw_channel_name}' synchronize csv file.")
input_csv_path = "/Users/jitendersingh/Documents/youtube_csv/temp_csv.csv"
output_csv_path = "/Users/jitendersingh/Documents/youtube_csv/"+raw_channel_name+" 24 HOURS.csv"
calculate_numbers(input_csv_path, output_csv_path)
tm.sleep(1)
os.remove("/Users/jitendersingh/Documents/youtube_csv/temp_csv.csv")
print(f"csv file of '{raw_channel_name}' synchronize csv file done.")
worksheet = sh.worksheet(raw_channel_name)
# Read the CSV file
with open(output_csv_path, 'r') as file:
csv_data = list(csv.reader(file))
# Update the worksheet with the CSV data
worksheet.update('A1', csv_data)
print(f"csv file of '{channel_name}' Sheet update done.")
sheet.update_cell(cell.row, 5, "24 hours Process Done")
# Get current date and time
current_date = datetime.datetime.now().strftime("%d %B %Y") # Format date as dd-mm-yyyy
current_time = datetime.datetime.now().strftime("%H:%M") # Format time in 24-hour format
# Update cells
sheet.update_cell(cell.row, 6, current_date) # Updating current date in the third column
sheet.update_cell(cell.row, 7, current_time) # Updating current time in the fourth column
else:
print(f"Failed to get channel info for URL: {url}")
else:
# Get channel name from the URL
channel_info = get_channel_info_using_api(url)
if channel_info:
channel_name = channel_info["name"]
print(f"24 hours process already done for channel '{channel_name}'.")
# create_worksheet_for_channel()
# print("first process complete---------------------")
worksheet_after()