26 Jul 2022
def get_email(res):
import re
allEmails = re.findall(r"[a-z0-9.\-+_]+@[a-z0-9.\-+_]+\.[a-z]+", res.text, re.I)
return allEmails
def is_website_root(url):
from urllib.parse import urlparse
site = urlparse(url)
if not site.path or site.path == '/':
return True
return False
def get_website_root(url):
from urllib.parse import urlparse
site = urlparse(url)
if not site.scheme:
raise KeyError('Invalid URL, must include http or https scheme')
return site.scheme + '://' + site.netloc
def is_valid_email(email, website):
from urllib.parse import urlparse
site = urlparse(website)
domain = site.netloc if site.netloc else site.path.strip('/')
if domain.lower() in email.split('@', 1)[1].lower():
return True
return False
from datetime import datetime
import os
print(f"\n>>>\n{datetime.now().strftime('%H:%M:%S')} starting {os.path.basename(__file__)}")
import time
start_time = time.time()
import ssl
import tldextract
from urllib.parse import urlparse
import csv
import pprint
import urllib3
from urllib3.exceptions import HTTPError
import urllib
import requests
import re
import httplib2
from datetime import datetime
import time
import json
import string
import sqlite3
from dotenv import load_dotenv
load_dotenv()
pp = pprint.PrettyPrinter(indent=4)
####################
# OPEN DB CONNECTION
db = sqlite3.connect('/path/to/local/sqlite/database/database.db')
c = db.cursor()
####################
# COUNTRY
# Build the various Country dicts needed by the functions, with local DB
dict_country = {'UK': 'UK'}
dict_country_extended = {'UK': 'UK'}
dict_EU = {}
dict_ID = {}
c.execute("""
SELECT rowid, code, name, eu
FROM countries;
""")
for row in c.fetchall():
id = row[0]
code = row[1].strip()
name = row[2].strip()
eu = row[3]
if eu.lower() == 'true':
eu = True
else:
eu = False
dict_country[name] = code
dict_country_extended[name] = code
dict_EU[code] = eu
dict_ID[code] = id
# dict_country_extended
c.execute("""
SELECT string, country_code
FROM countries_extended;
""")
dict_country_extended = {x[0]:x[1] for x in c.fetchall()}
def country_code_from_tld(tld):
global c
c.execute("""
SELECT tld, country_code
FROM tlds;
""")
dict_tld_country_code = {x[0]:x[1] for x in c.fetchall()}
return dict_tld_country_code[tld]
def get_country_code(country_name):
start_func = time.time()
global dict_country_extended
if country_name == 'UK':
country_code = 'UK'
run_time_func = round((time.time() - start_func), 2)
print(f'\n{run_time_func}s for func: get_country_code\n')
return country_code
elif country_name in dict_country_extended:
country_code = dict_country_extended[country_name]
return country_code
def get_list_country_codes():
start_func = time.time()
list_country_codes = dict_country
run_time_func = round((time.time() - start_func), 2)
print(f'\n{run_time_func}s for func: get_list_country_codes\n')
return list_country_codes
def get_country_id_from_code(code):
id = dict_ID[code]
return id
def get_country_code_from_id(id):
for k, v in dict_ID.items():
if v == id:
return k
def is_in_EU(country_code):
is_in_EU = dict_EU[country_code]
return is_in_EU
def non_eu_tlds(): # returns List
# start_func = time.time()
list_non_eu_tlds = []
with open('/path/to/file/tlds.csv', 'r', newline='', encoding='UTF-8') as i:
reader = csv.reader(i, delimiter=",")
data = list(reader)
for row in data:
tld = row[0].strip()
eu = row[2]
if eu == 'False':
list_non_eu_tlds.append(tld)
# run_time_func = round((time.time() - start_func), 2)
# print(f'\n{run_time_func}s for func: non_eu_tlds\n')
return list_non_eu_tlds
def find_country_code(input_text):
global dict_country_extended
for country in dict_country_extended:
if country.lower() in input_text.lower():
return dict_country_extended[country]
def get_call_dict():
start_func = time.time()
dict_call_code = {}
with open('/path/to/file/country_call.csv', 'r', newline='', encoding='UTF-8') as h:
reader = csv.reader(h, delimiter=",")
next(reader)
data = list(reader)
for row in data:
call_code = row[0]
country_code = row[1]
dict_call_code[call_code] = country_code
run_time_func = round((time.time() - start_func), 2)
print(f'\n{run_time_func}s for func: get_call_dict\n')
return dict_call_code
def country_by_email_domain(email):
start_func = time.time()
full_domain = email.split('@')[1]
for k,v in get_list_country_codes().items():
if full_domain.startswith(f"{v.lower()}."):
country = v
else:
country = ''
if country == '':
for k,v in get_list_country_codes().items():
if full_domain.endswith(f".{v.lower()}"):
country = v
else:
country = ''
run_time_func = round((time.time() - start_func), 2)
print(f'\n{run_time_func}s for func: country_by_email_domain\n')
return country
def country_by_phone(phone_as_string):
start_func = time.time()
phone = phone_as_string.strip()
dict_call_code = get_call_dict()
country = ''
for k, v in dict_call_code.items():
if phone.startswith(f"+{k}") or phone.startswith(f"+ {k}") or phone.startswith(f"00{k}") or phone.startswith(f"00 {k}") or phone.startswith(f"++{k}"):
country = v
if country == '':
pattern = re.compile(r'^\(\d{3}\)|^\d{3}\-|^\d{3}\.')
x = re.search(pattern, phone)
if x not in [None, '']:
country = 'US'
# Second pass on country_call list for numbers left, starting just with country code
if country == '':
for k, v in dict_call_code.items():
if phone.startswith(f"{k}") and not phone.startswith(f"{k}{k}"):
country = v
run_time_func = round((time.time() - start_func), 2)
print(f'\n{run_time_func}s for func: country_by_phone\n')
return country
def first_from_email(email):
start_func = time.time()
email_prefix = email.split('@')[0].lower()
first = ''
# Gather team emails
db = sqlite3.connect('/path/to/file/database.db')
c = db.cursor()
c.execute("""
SELECT email
FROM team_emails;
""")
team_emails = [x[0] for x in c.fetchall()]
db.close()
# First identify if known team email
if email_prefix == 'hi':
first = 'team'
if any(ele.lower() in email_prefix.lower() for ele in team_emails):
first = 'team'
# Now try to get first name if not team email
else:
if '.' in email_prefix:
pre_prefix = email_prefix.split('.')[0]
if len(pre_prefix) > 1:
if '-' in pre_prefix:
first = string.capwords(pre_prefix, sep='-')
else:
first = string.capwords(pre_prefix, sep=None)
if '_' in email_prefix:
pre_prefix = email_prefix.split('_')[0]
if len(pre_prefix) > 1:
if '-' in pre_prefix:
first = string.capwords(pre_prefix, sep='-')
else:
first = string.capwords(pre_prefix, sep=None)
if first == '':
# Catch first@
db = sqlite3.connect('/path/to/file/database.db')
c = db.cursor()
c.execute("""
SELECT first
FROM first
WHERE lower(first) LIKE ?;
""",
(f"{email_prefix}",)
)
potential_first = [x[0] for x in c.fetchall()]
db.close()
if len(potential_first) > 0:
first = str(potential_first[0])
# Catch firstlast@
if first == '':
db = sqlite3.connect('path/to/file/database.db')
c = db.cursor()
c.execute("""
SELECT first
FROM first;
"""
)
# Fin all potential matches
potential_first = [x[0] for x in c.fetchall() if email_prefix.startswith(x[0].lower())]
# Take the longest first name found in prefix
def longest_string(x):
strings = [i for i in x if isinstance(i, str)]
return (max(strings, key=len)) if strings else ''
first = longest_string(potential_first)
run_time_func = round((time.time() - start_func), 2)
if run_time_func > 2:
print(f'\n{run_time_func}s for func: first_from_email\n')
return first
## Twitter
def handle_from_long_url(long_social_url):
o = urlparse(long_social_url)
handle = o.path.replace('/','')
if 'twitter.com' in handle:
handle = handle.replace('twitter.com', '')
return handle
## Email
def email_to_pattern(email,firstname,lastname):
start_func = time.time()
firstname = firstname.lower()
lastname = lastname.lower()
words = email.strip().split("@")
domain = words[1]
email = words[0]
domainpieces = domain.split(".")
domainpieces[0] = "domain"
domain = ''
if len(domainpieces) > 2:
for item in domainpieces:
if item == domainpieces[0]:
domain += item + "."
elif item == domainpieces[len(domainpieces)-1]:
domain += item
else:
domain += item + "."
else:
domain = domainpieces[0]+"."+domainpieces[1]
if firstname[0]+lastname[0] in email:
email = email.replace(f"{(firstname[0]+lastname[0])}",f"f{lastname[0]}")
if firstname in email:
email = email.replace(firstname,"first")
if lastname in email:
email = email.replace(lastname,"last")
if firstname[0]+'.' in email:
email = email.replace(firstname[0]+'.',"f.")
if lastname[0]+"." in email:
email = email.replace(lastname[0]+".","l.")
run_time_func = round((time.time() - start_func), 2)
print(f'\n{run_time_func}s for func: email_to_pattern\n')
return email
####################
# CLEANING
def clean_long_url(long_url):
o = urlparse(long_url)
return f"{o.scheme}://{o.netloc}{o.path}"
def clean_website_url(website_url):
# start_func = time.time()
o = tldextract.extract(website_url)
if o.subdomain != '':
clean_website_url = f"https://{o.subdomain}.{o.domain}.{o.suffix}".lower()
else:
clean_website_url = f"https://{o.domain}.{o.suffix}".lower()
# run_time_func = round((time.time() - start_func), 2)
# print(f'\n{run_time_func}s for func: clean_website_url\n')
return clean_website_url
# Socials
blacklist_socials = [
'dialog',
'privacy',
'login',
'.jpg',
'.png',
]
def clean_facebook_url(fb): # facebook string
global blacklist_socials
if 'facebook' in fb:
# Remove root links
if fb.endswith('facebook.com/') or fb.endswith('facebook.com') or fb.endswith('facebook.com#') or fb.endswith('facebook.com/#'):
print(f"Removing entry for: {fb}")
return None
else:
# Fix format
parts = fb.split('facebook.com/')
clean_facebook = parts[1]
if '?' in clean_facebook:
parts2 = clean_facebook.split('?')
clean_facebook = parts2[0]
if clean_facebook.endswith('/'):
clean_facebook = clean_facebook[:-1]
if any(ele in clean_facebook for ele in blacklist_socials):
return None
if '\t' in clean_facebook:
clean_facebook = clean_facebook.replace('\t', '')
clean_facebook = clean_facebook.strip()
clean_facebook = f"https://www.facebook.com/{clean_facebook}"
return clean_facebook
else:
return print(f"NOT a Facebook link: {fb}")
def clean_twitter_url(tt): # twitter string
global blacklist_socials
if 'twitter' in tt:
# Remove root links
if tt.endswith('twitter.com/') or tt.endswith('twitter.com') or tt.endswith('twitter.com#') or tt.endswith('twitter.com/#'):
print(f"Removing entry for: {tt}")
return None
else:
# Fix format
parts = tt.split('twitter.com/')
clean_twitter = parts[1]
if '?' in clean_twitter:
parts2 = clean_twitter.split('?')
clean_twitter = parts2[0]
if clean_twitter.endswith('/'):
clean_twitter = clean_twitter[:-1]
if any(ele in clean_twitter for ele in blacklist_socials):
return None
if '\t' in clean_twitter:
clean_twitter = clean_twitter.replace('\t', '')
clean_twitter = clean_twitter.strip()
clean_twitter = f"https://twitter.com/{clean_twitter}"
return clean_twitter
else:
return print(f"NOT a Twitter link: {tt}")
def clean_instagram_url(it): # instagram string
global blacklist_socials
if 'instagram' in it:
# Remove root links
if it.endswith('instagram.com/') or it.endswith('instagram.com') or it.endswith('instagram.com#') or it.endswith('instagram.com/#'):
print(f"Removing entry for: {it}")
return None
else:
# Fix format
parts = it.split('instagram.com/')
clean_instagram = parts[1]
if '?' in clean_instagram:
parts2 = clean_instagram.split('?')
clean_instagram = parts2[0]
if clean_instagram.endswith('/'):
clean_instagram = clean_instagram[:-1]
if any(ele in clean_instagram for ele in blacklist_socials):
return None
if '\t' in clean_instagram:
clean_instagram = clean_instagram.replace('\t', '')
clean_instagram = clean_instagram.strip()
clean_instagram = f"https://www.instagram.com/{clean_instagram}"
return clean_instagram
else:
return print(f"NOT a Instagram link: {it}")
def clean_youtube_url(yt): # youtube string
global blacklist_socials
blacklist_socials.append('embed/')
if 'youtube' in yt:
# Remove root links
if yt.endswith('youtube.com/') or yt.endswith('youtube.com') or yt.endswith('youtube.com#') or yt.endswith('youtube.com/#'):
print(f"Removing entry for: {yt}")
return None
else:
# Fix format
parts = yt.split('youtube.com/')
clean_youtube = parts[1]
if '?' in clean_youtube:
parts2 = clean_youtube.split('?')
clean_youtube = parts2[0]
if clean_youtube.endswith('/'):
clean_youtube = clean_youtube[:-1]
if any(ele in clean_youtube for ele in blacklist_socials):
return None
if '\t' in clean_youtube:
clean_youtube = clean_youtube.replace('\t', '')
clean_youtube = clean_youtube.strip()
clean_youtube = f"https://www.youtube.com/{clean_youtube}"
return clean_youtube
else:
return print(f"NOT a youtube link: {yt}")
# URLs
def domain_from_url(url):
o = tldextract.extract(url)
domain = f"{o.domain}.{o.suffix}".lower()
if 'www.' in domain:
domain = domain.replace('www.','')
return domain
def domain_from_email(email):
x = email.strip().split('@')
if len(x) > 1:
domain = x[1]
ext = tldextract.extract(domain)
domain = f"{ext.domain}.{ext.suffix}".lower()
if 'www.' in domain:
domain = domain.replace('www.','')
return domain
else:
return email
def suffix_from_domain(domain):
tld = tldextract.extract(domain)
return f".{tld.suffix}"
def url_from_domain(domain):
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
s = Service('/path/to/file/chromedriver') # need to download relevant Chrome driver first: https://chromedriver.chromium.org/downloads
opts = webdriver.ChromeOptions()
opts.add_argument("--headless")
driver = webdriver.Chrome(service=s,options=opts)
driver.set_page_load_timeout(30)
driver = webdriver.Chrome(service=s, options=opts)
start_func = time.time()
try:
print(datetime.now().strftime('%H:%M:%S'), " - testing with httplib2")
h = httplib2.Http(".cache", timeout=4)
x = h.request(f"https://{domain}", "GET")
# print(x)
x = dict(x[0])
# print(x)
url = x['content-location']
print(datetime.now().strftime('%H:%M:%S'), ' - ', url)
run_time_func = round((time.time() - start_func), 2)
print(f'{run_time_func}s for func: url_from_domain')
return url
except:
# return ''
try:
# url = f'https://{domain}'
url = domain
print(datetime.now().strftime('%H:%M:%S'), f" - testing {url} with Selenium")
driver.get(url)
get_url = driver.current_url
print(get_url)
run_time_func = round((time.time() - start_func), 2)
print(f'{run_time_func}s for func: url_from_domain')
return get_url
except:
run_time_func = round((time.time() - start_func), 2)
print(f'{run_time_func}s for func: url_from_domain')
return ''