Board logo

标题: sqlserver数据导入mysql五:多线程导数据脚本 [打印本页]

作者: look_w    时间: 2019-4-19 20:15     标题: sqlserver数据导入mysql五:多线程导数据脚本

#!/usr/bin/perl use Encode; use Encode::CN; use DBI; use Switch; use strict; use Net::HandlerSocket; use threads; use Time::HiRes 'time'; my $aim_ip="192.168.0.208"; my $aim_db_name = "mysqldb"; my $hs_port = 9999; my $source_name = "sqldb"; my $source_user_name = "sa"; my $source_user_psd = "123"; my $db_name="mysqldb"; my $location="192.168.0.208"; my $port="3306"; my $db_user="zoe"; my $db_pass="123"; my $dbh=DBI->connect("dbi:ODBCsource_name",$source_user_name,$source_user_psd); # my $sth=$dbh->prepare("select name,object_id from sys.all_objects ao where type='U' and not exists( # select 1 from  sys.all_columns col where col.object_id=ao.object_id and system_type_id=240)"); my $sth=$dbh->prepare("select name,object_id from sys.all_objects where type='U' and is_ms_shipped=0 and name <>'sysdiagrams'"); $sth->execute(); my $threads_cnt=(not defined $ARGV[0])?10ARGV[0]; my $per_records=(not defined $ARGV[1])?3400ARGV[1]; my @data; my $datacount; my $n=0; my $ok=0; my $geo=0; #print "请输入数字确认运行第几份表的操作"; #my $var=0; #$var=<STDIN>; #chop ($var); my $var=$ARGV[2]; my $openname="alltablename_exportname_"."$var"."\.txt"; my $losetxtname="alltablename_loseprimary_"."$var"."\.txt"; my $bigtxtname="alltablename_bigcount_"."$var"."\.txt"; my $geotxtname='alltablename_geo_'.$var.'.txt'; my $okname="alltablename_ok_"."$var"."\.txt"; my $logname="alltablename_errorlog_"."$var"."\.txt"; my $repairname="alltablename_repair_"."$var"."\.txt"; my $linename="alltablename_line_"."$var"."\.txt"; open(FILE,">>$losetxtname"); syswrite(FILE,"缺少主键的表有:\n"); close(FILE); open(FILE,">>$bigtxtname"); syswrite(FILE,"超过千万条的表有:\n"); close(FILE); my($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst); my $format_time; my @a; open IN, "<", "$openname" or die "IN: $!"; while (<IN>) { chomp; if(defined($_ )) { push @a,$_; } } close IN; while (@data=$sth->fetchrow_array()) { $datacount=0; $ok=0; $geo=0; my ($select_columns,$insert_columns,$column_count,$sort_column,$column_types); ($select_columns,$insert_columns,$column_count,$sort_column,$column_types)=get_columns($data[0],$data[1]); $n+=1; if($ok ==1){ print '该表有主键'."\n"; if($geo ==1){ if($data[0]~~@a) { print '该表有地理值'."\n"; open(FILE,">>$geotxtname"); syswrite(FILE,"$n\n"); syswrite(FILE,"$data[0]\n"); ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time()); $format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec); syswrite(FILE,$format_time."\n"); close(FILE); my $relt = export_data_in ($select_columns,$insert_columns,$column_count,$sort_column,$data[0],$column_types); open(FILE,">>$okname"); syswrite(FILE,$n."\n"); syswrite(FILE,$data[0]."\n"); syswrite(FILE,$datacount."\n"); ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time()); $format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec); syswrite(FILE,$format_time."\n"); close(FILE); open(FILE,">>$linename"); syswrite(FILE,'数量:'.$datacount."\n"); syswrite(FILE,'完成'.$data[0].'复制'."\n"); syswrite(FILE,$format_time."\n"); close(FILE); } } else { if($data[0]~~@a) { print '该表在表名单内'."\n"; open(FILE,">>$linename"); syswrite(FILE,$data[0].'开始复制表'."\n"); ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time()); $format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec); syswrite(FILE,$format_time."\n"); close(FILE); my $relt = export_data_in ($select_columns,$insert_columns,$column_count,$sort_column,$data[0],$column_types); open(FILE,">>$okname"); syswrite(FILE,$n."\n"); syswrite(FILE,$data[0]."\n"); syswrite(FILE,$datacount."\n"); ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time()); $format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec); syswrite(FILE,$format_time."\n"); close(FILE); open(FILE,">>$linename"); syswrite(FILE,'数量:'.$datacount."\n"); syswrite(FILE,'完成'.$data[0].'复制'."\n"); syswrite(FILE,$format_time."\n"); close(FILE); } else { print '该表不在表名单内'."\n"; } } } else { if($data[0]~~@a) { print '该表无主键'."\n"; open(FILE,">>$losetxtname"); syswrite(FILE,"$n\n"); syswrite(FILE,"$data[0]\n"); ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time()); $format_time=sprintf("%d-%d-%d %d:%d:%d",$year+1990,$mon+1,$mday,$hour,$min,$sec); syswrite(FILE,$format_time."\n"); close(FILE); } } } $sth->finish; $dbh->disconnect; print '所有表的入库执行完毕!!!!'."\n"; sub export_data_in
     { my($select_columns,$insert_columns,$columns_count,$sort_column,$table_name,$column_types) = @_; print '开始读取mysql中的数量------------'."\n"; my $data_base = "DBI:mysqlaim_db_nameaim_ipport"; my $dbhmysql=DBI -> connect($data_base,$db_user,$db_pass); $dbhmysql->do("SET character_set_client = 'utf8'"); $dbhmysql->do("SET character_set_connection = 'utf8'"); my $mysql="select count(1),max($sort_column) from $aim_db_name\.$table_name"; print "$mysql\n"; #print "执行语句$mysql\n"; my $mysqlsth=$dbhmysql->prepare($mysql); $mysqlsth->execute() or die "ERROR:_[0]:mysqlsth->errstr"; my @data_count1=$mysqlsth->fetchrow_array(); my    $mysqlcount=@data_count1[0]; my $mymaxid=@data_count1[1]; $mysqlsth->finish; $dbhmysql->disconnect; print "mysql中已有数量mysqlcount\n目前id:$mymaxid\n"; $sort_column="[$sort_column]"; #构建SQL print '开始读取sqlserver中的数量------------'."\n"; my $rows_count=0; my $dbh2=DBI->connect("dbi:ODBCsource_name",$source_user_name,$source_user_psd); my $sth_sc=$dbh2->prepare("select count(1),min($sort_column),max($sort_column) from $table_name"); $sth_sc->execute(); my @data_count=$sth_sc->fetchrow_array(); $datacount=@data_count[0]; my $minid=@data_count[1]; my $maxid=@data_count[2]; print 'sqlserver中数量为:'. $datacount."\n"; if($datacount>=10000000) { open(FILE,">>$bigtxtname"); syswrite(FILE,"$n\n"); syswrite(FILE,"$table_name\n"); close(FILE); } if($mysqlcount==0) { $mymaxid=$minid; } else { $mymaxid=$mymaxid+1; } if($mysqlcount<$datacount) { my $begin_cnt = $mymaxid; ##这里不-1,会报21 my $end_cnt =$begin_cnt+$per_records ; my $thread; while($begin_cnt-1-$per_records < $maxid) { if($datacount<$per_records) { my $sql_select="SELECT $select_columns  FROM $table_name  where $sort_column BETWEEN $begin_cnt and $end_cnt"; ##开线程。参数请参照上边的描述 export_data($table_name,$sql_select,$insert_columns,$columns_count,$column_types,$datacount,$maxid,$begin_cnt,$end_cnt); $begin_cnt = $begin_cnt + $per_records+1; $end_cnt = $end_cnt + $per_records+1; } else { while(scalar(threads->list())<$threads_cnt) { my $sql_select="SELECT $select_columns  FROM $table_name  where $sort_column BETWEEN $begin_cnt and $end_cnt"; ##开线程。参数请参照上边的描述 threads->new(\&export_data, $table_name,$sql_select,$insert_columns,$columns_count,$column_types,$datacount,$maxid,$begin_cnt,$end_cnt); $begin_cnt = $begin_cnt + $per_records+1; $end_cnt = $end_cnt + $per_records+1; } foreach $thread(threads->list(threads::all)) { if($thread->is_joinable()) { $thread->join(); } } } } foreach $thread(threads->list(threads::all)) { $thread->join(); } } $sth_sc->finish; $dbh2->disconnect; }




欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) Powered by Discuz! 7.0.0